测试
开发生产级 API 的第一步是编写全面的测试,以确保业务需求得到满足,同时验证 API 的一致性和性能。第一部分主要通过单元测试和集成测试来处理,第二部分则通过负载测试来验证性能。
在本节的第一部分,我们将重点介绍如何对服务器进行单元测试。我们将为每种 API 类型编写一个测试,以了解如何将更多测试引入到项目中。在第二部分,我们将介绍 ghz,这是一个用于负载测试 gRPC API 的工具。我们将介绍该工具的不同选项,以及如何使用凭证、身份验证令牌作为头部等信息来进行 API 负载测试。
单元测试
如前所述,我们将专注于对服务器进行单元测试。在开始之前,重要的是要知道,本文中展示的测试并不是我们可以做的所有可能测试。为了保持本书的可读性,我将展示如何为每种 API 类型编写单元测试,并且您可以在 server/impl_test.go
文件中找到其他测试的示例。
在编写任何测试之前,我们需要进行一些设置。我们将编写一些模板代码,让不同的测试共享相同的服务器和连接。这主要是为了避免每次运行测试时都创建新的服务器和连接。然而,请注意,这些测试不是封闭的(non-hermetic)。这意味着,意外的状态可能会在多个测试之间共享,导致测试结果不稳定。我们将介绍一些方法来处理这种情况,并确保清除状态。
我们可以做的第一件事是创建一个假数据库。这就像我们之前做的 inMemoryDb
,事实上,FakeDb
是 inMemoryDb
的一个包装,但我们也将测试数据库连接的问题。
为此,我们将使用与 grpc.ServerOption
相同的模式。grpc.ServerOption
是一个函数,它将值应用到私有结构体中。一个示例是 grpc.Creds
:
func Creds(c credentials.TransportCredentials) ServerOption {
return newFuncServerOption(func(o *serverOptions) {
o.creds = c
})
}
它返回一个函数,一旦调用,就会将 c
的值设置为 serverOptions
中的 creds
属性。注意,serverOptions
与 ServerOption
是不同的,这是一个私有结构体。
我们将创建一个函数来告诉我们数据库是否可用。稍后,如果不可用,我们将启用选项来返回错误。在 test_options.go
文件中,我们将有如下代码:
func IsAvailable(a bool) TestOption {
return newFuncTestOption(func(o *testOptions) {
o.isAvailable = a
})
}
我将留给您自行检查 test_options.go
文件的其余内容。那里的函数和结构体仅仅是为了编写 IsAvailable
函数并为 isAvailable
获取默认值而创建的一些实用工具和变量。
现在,我们可以创建 FakeDb
。如前所述,这是 inMemoryDb
的一个包装,并且它有一些选项。在 fake_db.go
中,我们可以有以下内容:
type FakeDb struct {
d *inMemoryDb
opts testOptions
}
func NewFakeDb(opt ...TestOption) *FakeDb {
opts := defaultTestOptions
for _, o := range opt {
o.apply(&opts)
}
return &FakeDb{
d: &inMemoryDb{},
opts: opts,
}
}
func (db *FakeDb) Reset() {
db.opts = defaultTestOptions
db.d = &inMemoryDb{}
}
我们现在可以通过多种方式创建 FakeDb
:
NewFakeDb()
NewFakeDb(IsAvailable(false))
我们还覆盖了 inMemoryDb
的函数,使得 FakeDb
实现了 db
接口,这样我们就可以用这个数据库实例化服务器。FakeDb
的每个函数都遵循相同的模式:我们检查数据库是否可用;如果不可用,就返回一个错误,如果可用,就返回 inMemoryDb
的结果。以下是 addTask
函数(在 fake_db.go
中)的示例:
func (db *FakeDb) addTask(description string, dueDate time.Time) (uint64, error) {
if !db.opts.isAvailable {
return 0, fmt.Errorf("couldn't access the database")
}
return db.d.addTask(description, dueDate)
}
现在,我们有了这个,我们可以更进一步,编写实际的单元测试了。我们现在需要创建一个服务器。但是,我们不希望这个服务器实际使用我们计算机上的端口。使用实际端口可能会导致测试不稳定,因为如果端口已经在使用,测试会直接返回错误,提示无法创建服务器实例。
为了解决这个问题,gRPC 提供了一个名为 bufconn
的包(grpc/test/bufconn
)。它允许我们创建一个缓冲连接,因此不需要使用端口。bufconn.Listen
将创建一个监听器,我们可以使用这个监听器来处理请求。在 server_test.go
中,我们将共享监听器和数据库作为全局变量。这样,我们就可以在所有测试结束后处置监听器,并且在测试中添加/清除数据库中的任务。此外,我们还将创建一个函数来返回一个 net.Conn
连接,这样我们就可以在测试中使用它来创建客户端:
import (
"context"
"log"
"net"
pb "github.com/PacktPublishing/gRPC-Go-for-Professionals/proto/todo/v2"
"google.golang.org/grpc"
"google.golang.org/grpc/test/bufconn"
)
const bufSize = 1024 * 1024
var lis *bufconn.Listener
var fakeDb *FakeDb = NewFakeDb()
func init() {
lis = bufconn.Listen(bufSize)
s := grpc.NewServer()
var testServer *server = &server{
d: fakeDb,
}
pb.RegisterTodoServiceServer(s, testServer)
go func() {
if err := s.Serve(lis); err != nil && err.Error() != "closed" {
log.Fatalf("Server exited with error: %v\n", err)
}
}()
}
func bufDialer(context.Context, string) (net.Conn, error) {
return lis.Dial()
}
首先需要注意的是,我们使用 Go 的 init()
函数在测试开始之前进行此设置。然后,注意我们创建了一个服务器实例并注册了我们的 TodoService
实现。最后,服务器在一个 goroutine 中运行。因此,我们需要确保取消该 goroutine。
我们几乎完成了模板代码。我们需要创建一个客户端,使用 bufDialer
函数通过缓冲连接连接到服务器。在 impl_test.go
中,我们将创建一个函数,返回 TodoServiceClient
和 grpc.ClientConn
。第一个用于调用我们的端点,第二个是为了在每个测试结束时关闭客户端连接:
func newClient(t *testing.T) (*grpc.ClientConn, pb.TodoServiceClient) {
ctx := context.Background()
creds := grpc.WithTransportCredentials(insecure.NewCredentials())
conn, err := grpc.DialContext(ctx, "bufnet", grpc.WithContextDialer(bufDialer), creds)
if err != nil {
t.Fatalf("failed to dial bufnet: %v", err)
}
return conn, pb.NewTodoServiceClient(conn)
}
在这里需要理解的一个重要点是,我们并不是在测试 main.go
中编写的整个服务器,而只是测试我们的端点实现。这就是为什么我们可以使用不安全的凭证连接到服务器。拦截器、加密等应该在集成测试中进行测试。
最后,我们可以创建一个小的实用函数,检查错误是否为 gRPC 错误,并且错误信息是否符合预期:
func errorIs(err error, code codes.Code, msg string) bool {
if err != nil {
if s, ok := status.FromError(err); ok {
if code == s.Code() && s.Message() == msg {
return true
}
}
}
return false
}
现在,我们已经准备好编写一些单元测试。我们将创建一个函数来运行所有单元测试,并在所有子测试完成后处置监听器:
func TestRunAll(t *testing.T) {}
现在,我们可以像这样将测试添加到 TestRunAll
函数中:
func TestRunAll(t *testing.T) {
t.Run("AddTaskTests", func(t *testing.T) {
//...
})
t.Cleanup(func() {
lis.Close()
})
}
接下来,让我们编写 testAddTaskEmptyDescription
函数,检查当我们发送一个空描述的请求时,是否会得到错误。我们将创建一个新的客户端实例,创建一个空请求,发送给 AddTask
,然后检查错误代码是否为未知(Unknown
),错误消息是否为 invalid AddTaskRequest.Description: value length must be at least 1 runes
(来自 protoc-gen-validate
):
const (
errorInvalidDescription = "invalid AddTaskRequest.Description: value length must be at least 1 runes"
)
func testAddTaskEmptyDescription(t *testing.T) {
conn, c := newClient(t)
defer conn.Close()
req := &pb.AddTaskRequest{}
_, err := c.AddTask(context.TODO(), req)
if !errorIs(err, codes.Unknown, errorInvalidDescription) {
t.Errorf("expected Unknown with message \"%s\", got %v", errorInvalidDescription, err)
}
}
然后,我们可以将其添加到 TestRunAll
中,如下所示:
func TestRunAll(t *testing.T) {
t.Run("AddTaskTests", func(t *testing.T) {
t.Run("TestAddTaskEmptyDescription", testAddTaskEmptyDescription)
}
//...
}
为了运行这个测试,我们可以在根目录下运行以下命令:
$ go test -run ^TestRunAll$ ./server
ok
现在,在继续查看如何测试流式传输之前,让我们先看看如何在数据库不可用的情况下进行测试。这几乎和我们在 testAddTaskEmptyDescription
中做的相同,但我们这次将覆盖数据库。最后,我们将检查是否收到了内部错误,并重置数据库(以清除选项):
const (
//...
errorNoDatabaseAccess = "unexpected error: couldn't access the database"
)
func testAddTaskUnavailableDb(t *testing.T) {
conn, c := newClient(t)
defer conn.Close()
newDb := NewFakeDb(IsAvailable(false))
*fakeDb = *newDb
req := &pb.AddTaskRequest{
Description: "test",
DueDate: timestamppb.New(time.Now().Add(5 * time.Hour)),
}
_, err := c.AddTask(context.TODO(), req)
fakeDb.Reset()
if !errorIs(err, codes.Internal, errorNoDatabaseAccess) {
t.Errorf("expected Internal, got %v", err)
}
}
我们可以看到,测试数据库故障是非常容易的。这就是所有单一 RPC 的内容。我会让你将 testAddTaskUnavailableDb
添加到 TestRunAll
中,并查看 impl_test.go
中与 AddTasks 相关的其他测试。
接下来,我们将测试 ListTasks
。我们将一些任务添加到假的数据库中,调用 ListTasks
,确保没有错误,并检查 ListTasks
是否遍历了所有任务:
func testListTasks(t *testing.T) {
conn, c := newClient(t)
defer conn.Close()
fakeDb.d.tasks = []*pb.Task{
{}, {}, {}, // 3 个空任务
}
expectedRead := len(fakeDb.d.tasks)
req := &pb.ListTasksRequest{}
count := 0
res, err := c.ListTasks(context.TODO(), req)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
for {
_, err := res.Recv()
if err == io.EOF {
break
}
if err != nil {
t.Errorf("error while reading stream: %v", err)
}
count++
}
if count != expectedRead {
t.Errorf(
"expected reading %d tasks, read %d",
expectedRead, count,
)
}
}
在调用 API 方面没有什么新东西。我们从编写客户端时已经了解了这一切。然而,这个测试的主要不同之处在于,我们不查看值,而只是断言我们循环的次数。当然,你可以基于此创建更复杂的测试,但我想向你展示一个简单的服务器流式 API 测试,供你构建。
接下来,让我们测试客户端流式 API 端点。由于我们正在处理 UpdateTasks
端点,我们需要在数据库中设置数据。之后,我们将创建一个 UpdateTasksRequest
数组,用于更新数据库中的所有条目,发送请求并检查所有更新是否无误:
func testUpdateTasks(t *testing.T) {
conn, c := newClient(t)
defer conn.Close()
fakeDb.d.tasks = []*pb.Task{
{Id: 0, Description: "test1"},
{Id: 1, Description: "test2"},
{Id: 2, Description: "test3"},
}
requests := []*pb.UpdateTasksRequest{
{Id: 0}, {Id: 1}, {Id: 2},
}
expectedUpdates := len(requests)
stream, err := c.UpdateTasks(context.TODO())
count := 0
if err != nil {
t.Errorf("unexpected error: %v", err)
}
for _, req := range requests {
if err := stream.Send(req); err != nil {
t.Fatal(err)
}
count++
}
_, err = stream.CloseAndRecv()
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if count != expectedUpdates {
t.Errorf(
"expected updating %d tasks, updated %d",
expectedUpdates, count,
)
}
}
这与之前的测试类似。我们使用计数器来检查所有更新是否 “应用”。在集成测试中,你需要检查数据库中实际的值是否发生了变化;但是,因为我们处在单元测试中,并且使用的是内存数据库,检查实际的值并没有太大意义。
最后,我们将测试双向流式 API。这在测试环境中稍微复杂一些,但我们将一步步解决这个问题。之前,在客户端中,当 goroutine 出现错误时,我们只是运行了 log.Fatalf
来退出。然而,在这里,由于我们希望跟踪错误,并且不能在与测试不同的 goroutine 中调用 t.Fatalf
,我们将使用一个名为 countAndError
的结构体通道。顾名思义,这个结构体包含一个计数器和一个可选的错误:
type countAndError struct {
count int
err error
}
这很有用,因为现在,我们可以等待 goroutine 完成并通过通道获取结果。首先,让我们创建一个发送所有请求的函数。这个函数叫做 sendRequestsOverStream
,并将在一个单独的 goroutine 中调用:
func sendRequestsOverStream(stream pb.TodoService_DeleteTasksClient, requests []*pb.DeleteTasksRequest, waitc chan countAndError) {
for _, req := range requests {
if err := stream.Send(req); err != nil {
waitc <- countAndError{err: err}
close(waitc)
return
}
}
if err := stream.CloseSend(); err != nil {
waitc <- countAndError{err: err}
close(waitc)
}
}
如果发生错误,我们将通过设置错误的 countAndError
结构体来关闭等待通道。然后,我们可以创建一个读取响应的函数。这个函数叫做 readResponsesOverStream
,也将在一个单独的 goroutine 中调用:
func readResponsesOverStream(stream pb.TodoService_DeleteTasksClient, waitc chan countAndError) {
count := 0
for {
_, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
waitc <- countAndError{err: err}
close(waitc)
return
}
count++
}
waitc <- countAndError{count: count}
close(waitc)
}
这次,如果一切顺利,通道将收到一个包含计数的 countAndError
。这个计数与我们在之前的测试中做的相同,用来检查收集到的响应数量是否没有错误。
现在我们有了这两个函数,准备好编写实际的双向流式 API 测试了。这与我们为 ListTasks
和 UpdateTasks
所做的类似;不过这次,我们启动两个 goroutine,等待结果,并检查是否没有错误且计数与请求数量相等:
func testDeleteTasks(t *testing.T) {
conn, c := newClient(t)
defer conn.Close()
fakeDb.d.tasks = []*pb.Task{
{Id: 1}, {Id: 2}, {Id: 3},
}
expectedRead := len(fakeDb.d.tasks)
waitc := make(chan countAndError)
requests := []*pb.DeleteTasksRequest{
{Id: 1}, {Id: 2}, {Id: 3},
}
stream, err := c.DeleteTasks(context.TODO())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
go sendRequestsOverStream(stream, requests, waitc)
go readResponsesOverStream(stream, waitc)
countAndError := <-waitc
if countAndError.err != nil {
t.Errorf("expected error: %v", countAndError.err)
}
if countAndError.count != expectedRead {
t.Errorf(
"expected reading %d responses, read %d",
expectedRead, countAndError.count,
)
}
}
到此为止,我们已经完成了对所有不同类型的 gRPC API 的测试。再次强调,还有更多的测试可以做,impl_test.go
中也有其他示例。我强烈建议你查看那里,以便获得更多的思路。
在将所有这些测试添加到 TestRunAll
后,你应该能够像这样运行它们:
$ go test -run ^TestRunAll$ ./server
ok
如果你想要更详细的测试输出,可以添加 -v
选项。这将返回类似以下的内容:
$ go test -run ^TestRunAll$ -v ./server
--- PASS: TestRunAll
--- PASS: TestRunAll/AddTaskTests
--- PASS: TestRunAll/AddTaskTests/TestAddTaskUnavailableDb
--- PASS:
//...
PASS
Bazel
为了使用 Bazel
运行测试,你可以运行 Gazelle
来生成 //server:server_test
目标:
$ bazel run //:gazelle
然后你会在 server/BUILD.bazel
中找到这个目标,你应该能够运行以下命令:
$ bazel run //server:server_test
PASS
如果你想要更详细的测试输出,可以使用 --test_arg
选项并将其设置为 -test.v
,这样你会得到类似下面的输出:
$ bazel run //server:server_test --test_arg=-test.v
--- PASS: TestRunAll
--- PASS: TestRunAll/AddTaskTests
--- PASS: TestRunAll/AddTaskTests/TestAddTaskUnavailableDb
--- PASS:
//...
PASS
总结来说,我们展示了如何测试单一请求、服务器流式请求、客户端流式请求和双向流式请求 API。我们还看到,当使用 bufconn
时,运行测试时不需要使用机器上的端口。这使得我们的测试不再依赖于它运行的环境。最后,我们还展示了如何使用假数据(fakes)来测试系统依赖。这部分内容超出了本书的范围,但我认为它非常重要,值得一提,你即使使用 gRPC,也能编写常规的测试。
负载测试
测试服务时,另一个重要步骤是确保它们的效率,并且能够处理特定的负载。为此,我们使用负载测试工具来并发地向我们的服务发送请求。ghz
是一个专门做这件事的工具。在这一部分,我们将介绍如何使用这个工具,并且讲解一些我们需要设置的选项来测试我们的 API。
ghz
是一个高度可配置的工具。运行以下命令来查看和理解输出:
$ ghz --help
显然,我们不会使用所有这些选项,但我们会检查一些常见的选项以及在特定情况下需要使用的选项。让我们首先尝试进行一个简单的调用。
为了运行以下负载测试,你需要在 |
首先,我们运行服务器:
$ go run ./server 0.0.0.0:50051 0.0.0.0:50052
metrics server listening at 0.0.0.0:50052
gRPC server listening at 0.0.0.0:50051
接下来,我们将讨论四个最常用的选项。我们需要能够指定我们要调用的服务和方法(--call
),指定服务定义所在的 proto 文件(--proto
)以及导入文件的路径(--import_paths
),最后,指定要发送的请求数据。在我们的案例中,一个基本的命令,假设你在 chapter9
文件夹下运行,应该像这样:
$ ghz --proto ./proto/todo/v2/todo.proto \
--import-paths=proto \
--call todo.v2.TodoService.AddTask \
--data '{"description":"task"}' \
0.0.0.0:50051
然而,如果你尝试运行这个命令,你会遇到如下的错误信息:
connection error: desc = "transport: authentication handshake failed: tls: failed to verify certificate: x509: "test-server1" certificate is not standards compliant"
如你所见,这是因为我们设置了服务器只接受安全连接。为了解决这个问题,我们将使用 --cacert
选项,它允许我们指定 CA 证书的路径。如果你还记得,我们在客户端的代码中做了类似的设置,ghz
同样需要这些信息:
$ ghz #... \
--cacert ./certs/ca_cert.pem \
0.0.0.0:50051
运行这个命令,你会遇到相同的错误。原因是证书与域名相关联,这意味着只有来自某个特定域名的请求才能被接受。然而,由于我们在本地工作,这并不符合要求,因此会失败。为了解决这个问题,我们将使用 --cname
选项来覆盖发送请求的域名,以符合证书的要求:
$ ghz #... \
--cacert ./certs/ca_cert.pem \
--cname "check.test.example.com" \
0.0.0.0:50051
这里,我们使用 check.test.example.com
,因为我们从 grpc/grpc-go 示例 下载的证书是以 DNS 名称 *.test.example.com
生成的(参见 openssl.cnf
)。此外,请注意,--cacert
和 --cname
选项仅适用于自签名证书。通常,除非是特定情况,这些证书仅用于测试和非生产环境。
现在,如果你运行之前的命令,你应该得到以下错误:
Unauthenticated desc = failed to get auth_token
这应该引起警觉。这是我们在身份验证拦截器中发送的错误信息,当客户端没有提供 auth_token
元数据时会发生这种错误。为了发送这个元数据,我们将使用 --metadata
选项,它接受一个 JSON 字符串作为键值对:
ghz #... \
--metadata '{"auth_token":"authd"}' \
0.0.0.0:50051
运行了这些选项后,我们应该能够运行第一个负载测试(你的结果可能与以下不同):
$ ghz --proto ./proto/todo/v2/todo.proto \
--import-paths=proto \
--call todo.v2.TodoService.AddTask \
--data '{"description":"task"}' \
--cacert ./certs/ca_cert.pem \
--cname "check.test.example.com" \
--metadata '{"auth_token":"authd"}' \
0.0.0.0:50051
Summary:
Count: 200
Total: 22.89 ms
Slowest: 16.70 ms
Fastest: 0.20 ms
Average: 4.60 ms
Requests/sec: 8736.44
Response time histogram:
0.204 [1] |
1.854 [111] |∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎
3.504 [38] |∎∎∎∎∎∎∎∎∎∎∎∎∎∎
5.153 [0] |
6.803 [0] |
8.453 [0] |
10.103 [0] |
11.753 [0] |
13.403 [2] |∎
15.053 [26] |∎∎∎∎∎∎∎∎∎
16.703 [22] |∎∎∎∎∎∎∎∎
Latency distribution:
10 % in 0.33 ms
25 % in 0.78 ms
50 % in 1.75 ms
75 % in 2.39 ms
90 % in 15.12 ms
95 % in 15.31 ms
99 % in 16.48 ms
Status code distribution:
[OK] 200 responses
在这个总结中,有很多内容可以分析,但我们重点关注一些有趣的点。首先是请求的数量。我们可以看到,在这个测试中我们发送了 200 个请求。这是默认的请求数量,我们可以通过使用 --total
选项来更改这个数量(例如,500)。
然后,在响应时间直方图中,我们可以看到,200 个请求中有 111 个请求的执行时间大约为 2.29 毫秒。另一个有趣的点是我们有一些命令(50 个)运行时间超过了 13 毫秒。如果我们在生产环境中,可能需要进一步调查这些“高”执行时间的原因。这很大程度上取决于使用场景和需求。在我们的情况下,这几乎可以肯定是由于我们使用的低效的“数据库”,或者更准确地说,是 inMemoryDb.addTask
中重复调用的 append
导致的。
接下来是执行时间分布。我们可以看到,75% 的请求在 2.39 毫秒内执行完毕。事实上,这和之前提供的信息类似。如果我们将执行时间小于 3.504 毫秒的请求加起来并计算百分比,得到的结果是 (1 + 111 + 38) * 100 / 200 = 75%
。
然后,我们看到状态码分布。在我们的情况下,200 个请求都成功了。然而,在生产环境中,你可能会看到类似下面的结果(来自 ghz
文档):
Status code distribution:
[Unavailable] 3 responses
[PermissionDenied] 3 responses
[OK] 186 responses
[Internal] 8 responses
最后,还有一个我们在这里看不到的内容(因为我们没有错误):错误分布。这是错误消息的分布。再次强调,在生产环境中,你可能会看到类似这样的错误分布(来自 ghz
文档):
Error distribution:
[8] rpc error: code = Internal desc = Internal error.
[3] rpc error: code = PermissionDenied desc = Permission denied.
[3] rpc error: code = Unavailable desc = Service unavailable.
显然,我们可以使用这个工具做更多的事情。如前所述,它是高度可配置的,甚至可以将结果链接到 Grafana( https://ghz.sh/docs/extras )进行可视化。然而,这超出了本书的范围。我会把它留给你自己去尝试不同的选项,并使用 ghz
测试我们其他 API 端点的性能。
总而言之,我们看到了如何使用 ghz
对我们的服务进行负载测试。我们只看到了如何使用它测试我们的单一 API,但它同样适用于测试所有其他流式 API。在执行 ghz
命令后,我们看到了可以获取关于延迟、错误代码、错误消息分布、以及最快和最慢执行时间的信息。所有这些都非常有用,但需要理解的是,当与 Grafana 等可视化工具结合使用时,它的功能将更加强大。