cancel 调用
当您希望根据某些条件停止调用或中断一个长时间运行的流时,gRPC 提供了 cancel
函数,您可以随时执行它们。
如果您之前在分布式系统代码或 API 中使用过 Go,您可能已经遇到过一个名为 context
的类型。这是提供请求范围信息和跨 API 各个参与者进行信号传递的惯用方法,也是 gRPC 中的重要组成部分。
如果您没有注意到,直到现在,我们每次发起请求时都使用了 context.Background()
。在 Go 文档中,这被描述为 “返回一个非空的空 Context,它永远不会取消,没有任何值,也没有截止日期”。正如您所猜测的,这个方式并不适合生产就绪的 API,原因有以下几点:
-
如果用户希望提前终止请求怎么办?
-
如果 API 调用永远没有返回怎么办?
-
如果我们需要服务器了解全局值(例如身份验证令牌)怎么办?
在本节中,我们将重点讨论第一个问题,接下来的两节我们将回答其他问题。
为了获得取消调用的能力,我们将使用 context
包中的 WithCancel
函数( https://pkg.go.dev/context#WithCancel )。该函数将返回构造的上下文和一个 cancel
函数,我们可以执行该函数来中断使用该上下文发出的调用。因此,现在,我们不仅使用 context.Background()
,而是像这样创建一个上下文:
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
注意,在函数结束时调用 cancel
函数以释放与上下文相关的资源非常重要。为了确保函数调用,我们可以使用 defer
。但是,这并不意味着我们只能在函数结束时调用 cancel
函数。
作为一个示例,我们将创建一个虚构的需求。这个需求的目的是在我们遇到一个逾期任务时取消 ListTasks
调用。我们可以同意,从功能角度来看,这是没有意义的,但本节的目的是展示如何取消一个调用。
要实现这个功能,我们将使用 WithCancel
函数创建上下文,将此上下文传递给 ListTasks
API 端点,最后,在读取循环中添加一个 if
语句,检查是否有逾期任务。如果有逾期任务,我们将调用 cancel
函数:
func printTasks(c pb.TodoServiceClient) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
//...
stream, err := c.ListTasks(ctx, req)
//...
for {
//...
if res.Overdue {
log.Printf("CANCEL called")
cancel()
}
fmt.Println(res.Task.String(), "overdue: ", res.Overdue)
}
}
注意,我们本可以在遇到逾期任务时使用 break
来退出,而不是直接调用 cancel
函数。然后,defer cancel()
会起作用,导致服务器停止工作。但是,我决定直接调用 cancel
,并让客户端的循环继续运行,因为我希望向您展示,当我们取消调用时,会收到一个错误。
现在,我们需要知道 cancel
操作传播到网络需要一些时间,因此服务器可能会继续运行,而我们并不知道。为了检查服务器发送的内容,我们将简单地在发送到客户端之前打印任务:
func (s *server) ListTasks(req *pb.ListTasksRequest, stream pb.TodoService_ListTasksServer) error {
return s.d.getTasks(func(t interface{}) error {
//...
log.Println(task)
overdue := //...
err := stream.Send(&pb.ListTasksResponse{
//...
})
return err
})
}
最后,我想提到的是,我们不需要在客户端的 main
函数中添加任何代码,因为我们已经看过,当我们运行当前的 main
代码时,任务已经是逾期的。第一个逾期任务应该出现在更新部分:
fmt.Println("-------UPDATE------")
updateTasks(c, []*pb.UpdateTasksRequest{
{Id: id1, Description: "A better name for the task"},
//...
}...)
printTasks(c, nil)
fmt.Println("-------------------")
这是因为,如果您记得的话,我们更新了任务的 Id
值和描述,并将 Task
的其他属性设置为默认值。这意味着 Done
会被设置为 false
,而 DueDate
会被设置为空时间对象。
现在,我们可以像这样运行服务器:
$ go run ./server 0.0.0.0:50051
listening at 0.0.0.0:50051
然后,运行客户端:
在运行以下代码之前,请确保已注释掉所有会触发 panic 的函数调用,包括我们在上一节中添加的 |
$ go run ./client 0.0.0.0:50051
你应该注意到,在客户端,一切正常运行,即使 update
部分包含以下消息,什么也没有被取消:
CANCEL called.
造成这种情况的原因是,服务器不知道调用已经被取消。为了解决这个问题,我们可以让服务器能够感知取消操作。
为此,我们需要检查上下文的 Done
通道。当取消操作传播到服务器时,Done
通道会被关闭,并且在取消示例中,上下文的错误会是 context.Canceled
。当我们检测到这个事件时,我们就知道服务器需要返回,并有效地停止处理剩余的请求:
func (s *server) ListTasks(req *pb.ListTasksRequest, stream pb.TodoService_ListTasksServer) error {
ctx := stream.Context()
return s.d.getTasks(func(t interface{}) error {
select {
case <-ctx.Done():
switch ctx.Err() {
case context.Canceled:
log.Printf("request canceled: %s", ctx.Err())
default:
}
return ctx.Err()
// TODO: 在生产环境中,将下面的 case 替换为 'default:'。
case <-time.After(1 * time.Millisecond):
}
//...
})
}
在运行此代码之前,有几点需要注意:
-
在处理流时,我们可以通过调用生成的
stream
类型(在我们的例子中是pb.TodoService_ListTasksServer
)中的Context()
函数来获取上下文。 -
请注意,我们故意在每次调用闭包时让服务器休眠 1 毫秒。这在生产环境中不会发生;我们将在
select
中使用default
分支。这样做是为了给服务器留出时间来注意到取消操作。请注意,这个休眠时间是任意的,最小的时间值是为了让我在我的机器上注意到取消错误。您可能需要更改它,或者将其设为更大的值。
现在,我们可以再次运行服务器:
$ go run ./server 0.0.0.0:50051
listening at 0.0.0.0:50051
然后,我们开始运行客户端:
$ go run ./client 0.0.0.0:50051
//...
CANCEL called
id: 1 description: "A better name for the task" due_date: {}
overdue: true
unexpected error: rpc error: code = Canceled desc = context canceled
最后,您应该还会在服务器端看到以下消息:
request canceled: context canceled
总结一下,我们了解了如何使用 context.WithCancel()
来创建一个可取消的上下文。我们还看到,这个函数返回一个 cancel
函数,我们需要在作用域结束时调用它,以释放与上下文关联的资源,但我们也可以根据某些条件在更早的时候调用它来进行取消。最后,我们看到了如何使服务器能够感知取消操作,从而避免执行不必要的工作。