双向流式 API
在底层协议方面,双向流式 API 使用客户端 |
在双向流式 API 中,目标是让客户端发送零个或多个请求,并让服务器发送零个或多个响应。我们将使用这种方式来模拟一个类似于 updateTasks
的功能,但在这种功能中,我们将在每次删除后直接从 DeleteTasks
端点 API 获取反馈,而不是等待所有删除操作完成。
在继续实现之前,有一个需要明确的问题,那就是为什么不将 DeleteTasks
设计为服务器流式 API,或者将 updateTasks
设计为双向流式 API。
这两者的区别在于它们的 “破坏性” 程度。我们可以在客户端直接更新,即使请求还没有发送到服务器。如果出现任何错误,我们可以简单地查看客户端上的列表,并根据修改时间将其与服务器上的列表同步。而对于删除操作,则稍微复杂一些。我们可以将已删除的任务保留在客户端,待以后垃圾回收,或者需要将信息存储在其他地方,以便与服务器同步。这会增加一些额外的开销。
因此,我们将发送多个 DeleteTasksRequest
请求,对于每个请求,我们都将收到已删除的确认响应。如果发生错误,我们仍然可以确定在错误发生之前的任务已经在服务器上删除。我们的 RPC 和消息如下所示:
message DeleteTasksRequest { uint64 id = 1; } message DeleteTasksResponse { } service TodoService { //... rpc DeleteTasks(stream DeleteTasksRequest) returns (stream DeleteTasksResponse); }
protobuf
我们会重复发送要删除的任务的 ID,并且每当收到 DeleteTasksResponse
时,就意味着该任务已被删除。否则,我们会收到一个错误。
现在,在深入实现之前,让我们先看看数据库接口。
数据库演进
因为我们想要删除数据库中的 Task
对象,我们需要一个 deleteTask
函数。这个函数将接受一个要删除的 Task
对象的 ID,执行删除操作,并返回一个错误或 nil
。
我们可以在 server/db.go
中添加以下函数:
type db interface {
//...
deleteTask(id uint64) error
}
go
实现这个函数的方式与 updateTask
类似。不过,不同的是,在找到具有正确 ID 的任务时,我们将删除它,而不是更新它。我们可以在 server/in_memory.go
中这样实现:
func (d *inMemoryDb) deleteTask(id uint64) error {
for i, task := range d.tasks {
if task.Id == id {
d.tasks = append(d.tasks[:i], d.tasks[i+1:]...)
return nil
}
}
return fmt.Errorf("task with id %d not found", id)
}
go
这个 "slice trick" 的方法是,通过将当前任务之后的元素追加到前面的任务上,从而覆盖当前任务,实际上实现了删除操作。有了这个,我们现在就准备好实现 DeleteTasks
端点了。
实现 DeleteTasks
在实现实际的 DeleteTasks
端点之前,我们需要理解如何处理生成的代码。所以,我们首先使用以下命令生成代码:
$ protoc --go_out=. \ --go_opt=paths=source_relative \ --go-grpc_out=. \ --go-grpc_opt=paths=source_relative \ proto/todo/v1/*.proto
bash
如果我们检查 proto/todo/v1
文件夹中的 todo_grpc.pb.go
文件,我们应该会看到 TodoServiceServer
接口中添加了以下函数:
DeleteTasks(TodoService_DeleteTasksServer) error
go
这与 UpdateTasks
函数类似,因为我们同样是处理一个流,并返回一个错误或 nil
。然而,和 UpdateTasks
不同的是,我们现在使用 Send
和 Recv
函数,而不是 Send
和 SendAndClose
。
TodoService_DeleteTasksServer
被定义如下:
type TodoService_DeleteTasksServer interface {
Send(*DeleteTasksResponse) error
Recv() (*DeleteTasksRequest, error)
grpc.ServerStream
}
go
这意味着,在我们的实现中,我们可以调用 Recv
来获取 DeleteTasksRequest
,然后对每一个请求调用 Send
来发送 DeleteTasksResponse
。最后,由于我们正在处理一个流,我们仍然需要检查错误和 io.EOF
。当我们收到 io.EOF
时,我们就结束函数并返回:
func (s *server) DeleteTasks(stream pb.TodoService_DeleteTasksServer) error {
for {
req, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
s.d.deleteTask(req.Id)
stream.Send(&pb.DeleteTasksResponse{})
}
}
go
需要注意的一点是 stream.Send
调用。虽然这个调用很简单,但它是区分客户端流式和双向流式的关键。如果没有这个调用,我们实际上只是发送多个请求,并在最后服务器返回 nil
来关闭流。这就和 UpdateTasks
一样。但是,因为有了 Send
调用,我们现在在每次删除后都能得到直接的反馈。
从客户端调用 UpdateTasks
现在我们已经实现了 DeleteTasks
端点,我们可以从客户端调用它了。不过,在执行之前,我们需要查看生成的 TodoServiceClient
的代码。我们现在应该有以下函数:
DeleteTasks(ctx context.Context, opts ...grpc.CallOption) (TodoService_DeleteTasksClient, error)
go
这与我们在 ListTasks
和 UpdateTasks
函数中看到的类似,因为它返回一个我们可以与之交互的流对象。不过,正如你所猜到的,我们现在可以使用 Send
和 Recv
。TodoService_DeleteTasksClient
看起来像这样:
type TodoService_DeleteTasksClient interface {
Send(*DeleteTasksRequest) error
Recv() (*DeleteTasksResponse, error)
grpc.ClientStream
}
go
有了这些生成的代码和底层的 gRPC 框架,我们现在可以发送多个 DeleteTasksRequest
请求,并接收多个 DeleteTasksResponse
响应。
接下来,我们将在 client/main.go
中创建一个新函数,它将接受 DeleteTasksRequest
的可变参数。然后,我们会创建一个通道来帮助我们等待整个接收和发送过程完成。如果没有这个通道,我们会在完成之前就从函数中返回。这个通道将在一个 goroutine 中使用 Recv
方法来接收数据。每当我们在该 goroutine 中收到 io.EOF
时,我们就关闭通道。最后,我们将遍历所有请求并发送它们,一旦发送完成,我们会等待通道关闭。
现在这可能看起来有些抽象,但可以想象客户端的工作是需要同时使用 Recv
和 Send
,因此我们需要一些简单的并发代码:
func deleteTasks(c pb.TodoServiceClient, reqs ...*pb.DeleteTasksRequest) {
stream, err := c.DeleteTasks(context.Background())
if err != nil {
log.Fatalf("unexpected error: %v", err)
}
waitc := make(chan struct{})
go func() {
for {
_, err := stream.Recv()
if err == io.EOF {
close(waitc)
break
}
if err != nil {
log.Fatalf("error while receiving: %v\n", err)
}
log.Println("deleted tasks")
}
}()
for _, req := range reqs {
if err := stream.Send(req); err != nil {
return
}
}
if err := stream.CloseSend(); err != nil {
return
}
<-waitc
}
go
最后,在运行我们的服务器和客户端之前,我们可以在 main
函数中调用这个方法。我们将删除所有之前通过 addTasks
创建的任务,并尝试通过打印所有任务来验证任务是否已被删除:
fmt.Println("-------DELETE------")
deleteTasks(c, []*pb.DeleteTasksRequest{
{Id: id1},
{Id: id2},
{Id: id3},
}...)
printTasks(c)
fmt.Println("-------------------")
go
现在,我们可以先运行服务器:
$ go run ./server 0.0.0.0:50051 listening at 0.0.0.0:50051
bash
然后运行我们的客户端:
$ go run ./client 0.0.0.0:50051 //... -------DELETE------ 2023/03/31 18:54:21 deleted tasks 2023/03/31 18:54:21 deleted tasks 2023/03/31 18:54:21 deleted tasks -------------------
bash
请注意,这里我们收到了三个响应(三个 deleted tasks
),而不是像客户端流那样只收到一个响应。这是因为我们为每个请求都得到了一个响应。我们实际上实现了双向流式传输。
通过实现双向流式传输,我们可以在每次发送请求时从服务器得到反馈。这样,我们就能确保在客户端上更新资源,而不必等待服务器的响应或错误。这对于像我们这种需要实时更新的用例非常有用。