客户端流式 API
在底层协议方面,客户端流式 API 使用 |
对于客户端流式 API 端点,我们可以发送零个或多个请求,并获取一个响应。这是一个重要的概念,尤其是在实时上传数据的场景中。例如,假设我们在前端点击编辑按钮,启动一个编辑会话,并实时提交每个编辑操作。当然,由于我们不涉及这样复杂的前端功能,我们将专注于使 API 与这种功能兼容。
要定义一个客户端流式 API,我们只需要在参数声明中使用 stream
关键字,而不是 return
。之前,对于我们的服务器流式 API,我们有以下定义:
rpc ListTasks(ListTasksRequest) returns (stream ListTasksResponse);
现在,对于 UpdateTasks
,我们将有以下定义:
message UpdateTasksRequest {
Task task = 1;
}
message UpdateTasksResponse {
}
service TodoService {
// ...
rpc UpdateTasks(stream UpdateTasksRequest) returns (UpdateTasksResponse);
}
请注意,在这种情况下,我们在请求中使用的是完整的 Task
消息,而不是像在 AddTask
中那样使用分开的字段。这并不是一个错误,我们将在第六章进一步讨论这个问题。
这实际上意味着客户端发送多个请求,而服务器返回一个响应。
我们现在离实现我们的端点更近了一步。然而,在继续之前,让我们先讨论一下数据库。
数据库演进
在考虑实现 UpdateTasks
之前,我们需要了解如何与数据库进行交互。首先需要考虑的是,对于给定的任务,可以更新哪些信息。在我们的案例中,我们不希望客户端能够更新任务的 ID,因为 ID 是由数据库处理的细节。但是,对于其他所有信息,我们希望允许用户进行更新。当任务完成时,我们需要能够将 done
设置为 true
;当 Task
描述需要更新时,客户端应该能够在数据库中修改它;最后,当截止日期发生变化时,客户端也应该能够更新它。
知道了这些,我们可以定义数据库中 updateTask
函数的签名。它将接收任务 ID 和所有可以更改的信息作为参数,并返回一个错误或 nil
:
type db interface {
//...
updateTask(id uint64, description string, dueDate time.Time, done bool) error
}
同样,这看起来好像需要传递很多参数,但我们并不想将这个接口与任何生成的代码耦合。如果以后我们需要添加更多信息或移除某些信息,只需更新这个接口和相应的实现即可。
现在,为了实现这一功能,我们将进入 in_memory.go
文件。该函数将简单地遍历所有的任务,如果某个任务的 ID 与传入的 ID 匹配,我们将逐一更新所有字段:
func (d *inMemoryDb) updateTask(id uint64, description string, dueDate time.Time, done bool) error {
for i, task := range d.tasks {
if task.Id == id {
t := d.tasks[i]
t.Description = description
t.DueDate = timestamppb.New(dueDate)
t.Done = done
return nil
}
}
return fmt.Errorf("task with id %d not found", id)
}
这意味着每次我们接收到一个请求时,都将遍历所有任务。对于一个真正的数据库来说,这种方法效率不高,特别是当数据库变得很大的时候。然而,由于我们并没有使用真实的数据库,这对于本书中的使用场景来说已经足够了。
实现 UpdateTasks
为了实现 UpdateTasks
端点,我们需要从 proto
文件生成 Go 代码,并获取我们需要实现的函数签名。我们只需运行以下命令:
$ protoc --go_out=. \
--go_opt=paths=source_relative \
--go-grpc_out=. \
--go-grpc_opt=paths=source_relative \
proto/todo/v1/*.proto
然后,如果我们查看 proto/todo/v1/todo_grpc.pb.go
文件,我们现在会看到 TodoServiceServer
接口中多出了一个函数:
type TodoServiceServer interface {
//...
UpdateTasks(TodoService_UpdateTasksServer) error
//...
}
如我们所见,函数签名与 ListTasks
类似;不过,这次我们不再处理请求,而是直接处理 TodoService_UpdateTasksServer
类型的流。如果我们检查 TodoService_UpdateTasksServer
类型的定义,我们会看到以下内容:
type TodoService_UpdateTasksServer interface {
SendAndClose(*UpdateTasksResponse) error
Recv() (*UpdateTasksRequest, error)
grpc.ServerStream
}
我们已经熟悉了 Recv
函数,它让我们获取一个对象。现在,我们还拥有一个 SendAndClose
函数。这个函数让我们能够告诉客户端,我们已经完成了服务器端的工作。当客户端发送 io.EOF
时,我们可以使用这个函数来关闭流。
了解了这些后,我们可以实现我们的端点。我们将反复调用流上的 Recv
函数,如果接收到 io.EOF
,我们就使用 SendAndClose
函数;否则,我们将调用数据库上的 updateTask
函数:
func (s *server) UpdateTasks(stream pb.TodoService_UpdateTasksServer) error {
for {
req, err := stream.Recv()
if err == io.EOF {
return stream.SendAndClose(&pb.UpdateTasksResponse{})
}
if err != nil {
return err
}
s.d.updateTask(
req.Task.Id,
req.Task.Description,
req.Task.DueDate.AsTime(),
req.Task.Done,
)
}
}
现在,我们应该能够触发这个 API 端点来实时更改给定的任务集合。接下来,让我们看看客户端如何调用这个端点。
从客户端调用 UpdateTasks
这次,由于我们正在使用客户端流式 API,我们将与之前的服务器流式处理相反。客户端将重复调用 Send
,而服务器将重复调用 Recv
。最后,客户端将调用 CloseAndRecv
函数,该函数在生成的代码中定义。
如果我们查看客户端端生成的 UpdateTasks
代码,我们会看到 TodoServiceClient
类型中的以下签名:
UpdateTasks(ctx context.Context, opts ...grpc.CallOption) (TodoService_UpdateTasksClient, error)
注意,UpdateTasks
函数现在不接受任何请求参数,但它会返回一个 TodoService_UpdateTasksClient
类型的流。正如前面提到的,这个类型包含两个函数:Send
和 CloseAndRecv
。如果我们查看生成的代码,定义如下:
type TodoService_UpdateTasksClient interface {
Send(*UpdateTasksRequest) error
CloseAndRecv() (*UpdateTasksResponse, error)
grpc.ClientStream
}
Send
用来发送 UpdateTasksRequest
,而 CloseAndRecv
会告诉服务器客户端已完成发送请求,并请求 UpdateTasksResponse
。
了解了这些后,我们可以在 client/main.go
文件中实现 UpdateTasks
函数。我们将调用 gRPC 客户端的 UpdateTasks
函数,这会返回一个流,然后我们通过这个流发送任务。一旦我们循环处理完所有需要更新的任务,就会调用 CloseAndRecv
函数:
func updateTasks(c pb.TodoServiceClient, reqs ...*pb.UpdateTasksRequest) {
stream, err := c.UpdateTasks(context.Background())
if err != nil {
log.Fatalf("unexpected error: %v", err)
}
for _, req := range reqs {
err := stream.Send(req)
if err != nil {
return
}
if req.Task != nil {
fmt.Printf("updated task with id: %d\n", req.Task.Id)
}
}
if _, err = stream.CloseAndRecv(); err != nil {
log.Fatalf("unexpected error: %v", err)
}
}
如您所见,在 updateTasks
中,我们需要传递零个或多个 UpdateTasksRequest
作为参数。为了获取所需的任务 ID 来创建任务实例并填充 UpdateTasksRequest
的 task
字段,我们将记录先前通过 addTasks
创建的任务 ID。因此,原来我们在 main
中是这样做的:
addTask(c, "This is a task", dueDate)
现在我们会有类似这样的代码:
id1 := addTask(c, "This is a task", dueDate)
id2 := addTask(c, "This is another task", dueDate)
id3 := addTask(c, "And yet another task", dueDate)
现在,我们可以创建一个 UpdateTasksRequest
数组,像这样:
[]*pb.UpdateTasksRequest{
{Task: &pb.Task{Id: id1, Description: "A better name for the task"}},
{Task: &pb.Task{Id: id2, DueDate: timestamppb.New(dueDate.Add(5 * time.Hour))}},
{Task: &pb.Task{Id: id3, Done: true}},
}
这意味着,ID 为 id1
的任务将更新为新的描述,ID 为 id2
的任务将更新为新的 due_date
值,而最后一个任务将被标记为已完成。
我们现在可以将客户端传递给 updateTasks
,并通过 …
操作符扩展这个数组作为可变参数。在 main
函数中,我们可以添加如下内容:
fmt.Println("-------UPDATE------")
updateTasks(c, []*pb.UpdateTasksRequest{
{Task: &pb.Task{Id: id1, Description: "A better name for the task"}},
{Task: &pb.Task{Id: id2, DueDate: timestamppb.New(dueDate.Add(5 * time.Hour))}},
{Task: &pb.Task{Id: id3, Done: true}},
}...)
printTasks(c)
fmt.Println("-------------------")
现在我们可以像之前一样运行代码。首先运行服务器:
$ go run ./server 0.0.0.0:50051
listening at 0.0.0.0:50051
然后运行客户端调用 API 端点:
$ go run ./client 0.0.0.0:50051
//...
-------UPDATE------
updated task with id: 1
updated task with id: 2
updated task with id: 3
id:1 description:"A better name for the task" due_date:{}
id:2 due_date:{seconds:1680267768 nanos:127075000}
id:3 done:true due_date:{}
-------------------
在继续之前,有一个重要的事情需要注意。你可能会对更新任务时丢失部分信息感到惊讶。这是因为 Protobuf
会在未设置字段时使用该字段的默认值——这意味着,如果客户端发送一个只包含 done
为 true
的 Task
对象,description
字段将被反序列化为空字符串,而 due_date
则会被反序列化为一个空的 google.protobuf.Timestamp
。目前,这种方式效率较低,因为我们需要重新发送所有信息才能更新一个字段。稍后在书中,我们将讨论如何解决这个问题。
现在,我们已经能够基于任务的 ID 实时更新多个任务。接下来,让我们看看最后一种 API 类型:双向流式处理。