双向流式 API

在底层协议方面,双向流式 API 使用客户端 Send Header,随后是服务器和/或客户端多个 Send Message,客户端半关闭流,最后服务器 Send Trailer

在双向流式 API 中,目标是让客户端发送零个或多个请求,并让服务器发送零个或多个响应。我们将使用这种方式来模拟一个类似于 updateTasks 的功能,但在这种功能中,我们将在每次删除后直接从 DeleteTasks 端点 API 获取反馈,而不是等待所有删除操作完成。

在继续实现之前,有一个需要明确的问题,那就是为什么不将 DeleteTasks 设计为服务器流式 API,或者将 updateTasks 设计为双向流式 API。 这两者的区别在于它们的 “破坏性” 程度。我们可以在客户端直接更新,即使请求还没有发送到服务器。如果出现任何错误,我们可以简单地查看客户端上的列表,并根据修改时间将其与服务器上的列表同步。而对于删除操作,则稍微复杂一些。我们可以将已删除的任务保留在客户端,待以后垃圾回收,或者需要将信息存储在其他地方,以便与服务器同步。这会增加一些额外的开销。

因此,我们将发送多个 DeleteTasksRequest 请求,对于每个请求,我们都将收到已删除的确认响应。如果发生错误,我们仍然可以确定在错误发生之前的任务已经在服务器上删除。我们的 RPC 和消息如下所示:

message DeleteTasksRequest {
    uint64 id = 1;
}

message DeleteTasksResponse {
}

service TodoService {
    //...
    rpc DeleteTasks(stream DeleteTasksRequest) returns (stream DeleteTasksResponse);
}
protobuf

我们会重复发送要删除的任务的 ID,并且每当收到 DeleteTasksResponse 时,就意味着该任务已被删除。否则,我们会收到一个错误。

现在,在深入实现之前,让我们先看看数据库接口。

数据库演进

因为我们想要删除数据库中的 Task 对象,我们需要一个 deleteTask 函数。这个函数将接受一个要删除的 Task 对象的 ID,执行删除操作,并返回一个错误或 nil

我们可以在 server/db.go 中添加以下函数:

type db interface {
    //...
    deleteTask(id uint64) error
}
go

实现这个函数的方式与 updateTask 类似。不过,不同的是,在找到具有正确 ID 的任务时,我们将删除它,而不是更新它。我们可以在 server/in_memory.go 中这样实现:

func (d *inMemoryDb) deleteTask(id uint64) error {
    for i, task := range d.tasks {
        if task.Id == id {
            d.tasks = append(d.tasks[:i], d.tasks[i+1:]...)
            return nil
        }
    }
    return fmt.Errorf("task with id %d not found", id)
}
go

这个 "slice trick" 的方法是,通过将当前任务之后的元素追加到前面的任务上,从而覆盖当前任务,实际上实现了删除操作。有了这个,我们现在就准备好实现 DeleteTasks 端点了。

实现 DeleteTasks

在实现实际的 DeleteTasks 端点之前,我们需要理解如何处理生成的代码。所以,我们首先使用以下命令生成代码:

$ protoc --go_out=. \
    --go_opt=paths=source_relative \
    --go-grpc_out=. \
    --go-grpc_opt=paths=source_relative \
    proto/todo/v1/*.proto
bash

如果我们检查 proto/todo/v1 文件夹中的 todo_grpc.pb.go 文件,我们应该会看到 TodoServiceServer 接口中添加了以下函数:

DeleteTasks(TodoService_DeleteTasksServer) error
go

这与 UpdateTasks 函数类似,因为我们同样是处理一个流,并返回一个错误或 nil。然而,和 UpdateTasks 不同的是,我们现在使用 SendRecv 函数,而不是 SendSendAndClose

TodoService_DeleteTasksServer 被定义如下:

type TodoService_DeleteTasksServer interface {
    Send(*DeleteTasksResponse) error
    Recv() (*DeleteTasksRequest, error)
    grpc.ServerStream
}
go

这意味着,在我们的实现中,我们可以调用 Recv 来获取 DeleteTasksRequest,然后对每一个请求调用 Send 来发送 DeleteTasksResponse。最后,由于我们正在处理一个流,我们仍然需要检查错误和 io.EOF。当我们收到 io.EOF 时,我们就结束函数并返回:

func (s *server) DeleteTasks(stream pb.TodoService_DeleteTasksServer) error {
    for {
        req, err := stream.Recv()
        if err == io.EOF {
            return nil
        }
        if err != nil {
            return err
        }

        s.d.deleteTask(req.Id)
        stream.Send(&pb.DeleteTasksResponse{})
    }
}
go

需要注意的一点是 stream.Send 调用。虽然这个调用很简单,但它是区分客户端流式和双向流式的关键。如果没有这个调用,我们实际上只是发送多个请求,并在最后服务器返回 nil 来关闭流。这就和 UpdateTasks 一样。但是,因为有了 Send 调用,我们现在在每次删除后都能得到直接的反馈。

从客户端调用 UpdateTasks

现在我们已经实现了 DeleteTasks 端点,我们可以从客户端调用它了。不过,在执行之前,我们需要查看生成的 TodoServiceClient 的代码。我们现在应该有以下函数:

DeleteTasks(ctx context.Context, opts ...grpc.CallOption)
(TodoService_DeleteTasksClient, error)
go

这与我们在 ListTasksUpdateTasks 函数中看到的类似,因为它返回一个我们可以与之交互的流对象。不过,正如你所猜到的,我们现在可以使用 SendRecvTodoService_DeleteTasksClient 看起来像这样:

type TodoService_DeleteTasksClient interface {
    Send(*DeleteTasksRequest) error
    Recv() (*DeleteTasksResponse, error)
    grpc.ClientStream
}
go

有了这些生成的代码和底层的 gRPC 框架,我们现在可以发送多个 DeleteTasksRequest 请求,并接收多个 DeleteTasksResponse 响应。

接下来,我们将在 client/main.go 中创建一个新函数,它将接受 DeleteTasksRequest 的可变参数。然后,我们会创建一个通道来帮助我们等待整个接收和发送过程完成。如果没有这个通道,我们会在完成之前就从函数中返回。这个通道将在一个 goroutine 中使用 Recv 方法来接收数据。每当我们在该 goroutine 中收到 io.EOF 时,我们就关闭通道。最后,我们将遍历所有请求并发送它们,一旦发送完成,我们会等待通道关闭。

现在这可能看起来有些抽象,但可以想象客户端的工作是需要同时使用 RecvSend,因此我们需要一些简单的并发代码:

func deleteTasks(c pb.TodoServiceClient, reqs ...*pb.DeleteTasksRequest) {
    stream, err := c.DeleteTasks(context.Background())

    if err != nil {
        log.Fatalf("unexpected error: %v", err)
    }

    waitc := make(chan struct{})

    go func() {
        for {
            _, err := stream.Recv()
            if err == io.EOF {
                close(waitc)
                break
            }
            if err != nil {
                log.Fatalf("error while receiving: %v\n", err)
            }
            log.Println("deleted tasks")
        }
    }()

    for _, req := range reqs {
        if err := stream.Send(req); err != nil {
            return
        }
    }
    if err := stream.CloseSend(); err != nil {
        return
    }

    <-waitc
}
go

最后,在运行我们的服务器和客户端之前,我们可以在 main 函数中调用这个方法。我们将删除所有之前通过 addTasks 创建的任务,并尝试通过打印所有任务来验证任务是否已被删除:

fmt.Println("-------DELETE------")
deleteTasks(c, []*pb.DeleteTasksRequest{
    {Id: id1},
    {Id: id2},
    {Id: id3},
}...)

printTasks(c)
fmt.Println("-------------------")
go

现在,我们可以先运行服务器:

$ go run ./server 0.0.0.0:50051
listening at 0.0.0.0:50051
bash

然后运行我们的客户端:

$ go run ./client 0.0.0.0:50051
//...
-------DELETE------
2023/03/31 18:54:21 deleted tasks
2023/03/31 18:54:21 deleted tasks
2023/03/31 18:54:21 deleted tasks
-------------------
bash

请注意,这里我们收到了三个响应(三个 deleted tasks),而不是像客户端流那样只收到一个响应。这是因为我们为每个请求都得到了一个响应。我们实际上实现了双向流式传输。

通过实现双向流式传输,我们可以在每次发送请求时从服务器得到反馈。这样,我们就能确保在客户端上更新资源,而不必等待服务器的响应或错误。这对于像我们这种需要实时更新的用例非常有用。