cancel 调用

当您希望根据某些条件停止调用或中断一个长时间运行的流时,gRPC 提供了 cancel 函数,您可以随时执行它们。

如果您之前在分布式系统代码或 API 中使用过 Go,您可能已经遇到过一个名为 context 的类型。这是提供请求范围信息和跨 API 各个参与者进行信号传递的惯用方法,也是 gRPC 中的重要组成部分。

如果您没有注意到,直到现在,我们每次发起请求时都使用了 context.Background()。在 Go 文档中,这被描述为 “返回一个非空的空 Context,它永远不会取消,没有任何值,也没有截止日期”。正如您所猜测的,这个方式并不适合生产就绪的 API,原因有以下几点:

  • 如果用户希望提前终止请求怎么办?

  • 如果 API 调用永远没有返回怎么办?

  • 如果我们需要服务器了解全局值(例如身份验证令牌)怎么办?

在本节中,我们将重点讨论第一个问题,接下来的两节我们将回答其他问题。

为了获得取消调用的能力,我们将使用 context 包中的 WithCancel 函数( https://pkg.go.dev/context#WithCancel )。该函数将返回构造的上下文和一个 cancel 函数,我们可以执行该函数来中断使用该上下文发出的调用。因此,现在,我们不仅使用 context.Background(),而是像这样创建一个上下文:

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

注意,在函数结束时调用 cancel 函数以释放与上下文相关的资源非常重要。为了确保函数调用,我们可以使用 defer。但是,这并不意味着我们只能在函数结束时调用 cancel 函数。

作为一个示例,我们将创建一个虚构的需求。这个需求的目的是在我们遇到一个逾期任务时取消 ListTasks 调用。我们可以同意,从功能角度来看,这是没有意义的,但本节的目的是展示如何取消一个调用。

要实现这个功能,我们将使用 WithCancel 函数创建上下文,将此上下文传递给 ListTasks API 端点,最后,在读取循环中添加一个 if 语句,检查是否有逾期任务。如果有逾期任务,我们将调用 cancel 函数:

func printTasks(c pb.TodoServiceClient) {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    //...
    stream, err := c.ListTasks(ctx, req)
    //...

    for {
        //...

        if res.Overdue {
            log.Printf("CANCEL called")
            cancel()
        }

        fmt.Println(res.Task.String(), "overdue: ", res.Overdue)
    }
}

注意,我们本可以在遇到逾期任务时使用 break 来退出,而不是直接调用 cancel 函数。然后,defer cancel() 会起作用,导致服务器停止工作。但是,我决定直接调用 cancel,并让客户端的循环继续运行,因为我希望向您展示,当我们取消调用时,会收到一个错误。

现在,我们需要知道 cancel 操作传播到网络需要一些时间,因此服务器可能会继续运行,而我们并不知道。为了检查服务器发送的内容,我们将简单地在发送到客户端之前打印任务:

func (s *server) ListTasks(req *pb.ListTasksRequest, stream pb.TodoService_ListTasksServer) error {
    return s.d.getTasks(func(t interface{}) error {
        //...

        log.Println(task)
        overdue := //...
        err := stream.Send(&pb.ListTasksResponse{
            //...
        })

        return err
    })
}

最后,我想提到的是,我们不需要在客户端的 main 函数中添加任何代码,因为我们已经看过,当我们运行当前的 main 代码时,任务已经是逾期的。第一个逾期任务应该出现在更新部分:

fmt.Println("-------UPDATE------")
updateTasks(c, []*pb.UpdateTasksRequest{
    {Id: id1, Description: "A better name for the task"},
    //...
}...)
printTasks(c, nil)
fmt.Println("-------------------")

这是因为,如果您记得的话,我们更新了任务的 Id 值和描述,并将 Task 的其他属性设置为默认值。这意味着 Done 会被设置为 false,而 DueDate 会被设置为空时间对象。

现在,我们可以像这样运行服务器:

$ go run ./server 0.0.0.0:50051
listening at 0.0.0.0:50051

然后,运行客户端:

在运行以下代码之前,请确保已注释掉所有会触发 panic 的函数调用,包括我们在上一节中添加的 addTask

$ go run ./client 0.0.0.0:50051

你应该注意到,在客户端,一切正常运行,即使 update 部分包含以下消息,什么也没有被取消:

CANCEL called.

造成这种情况的原因是,服务器不知道调用已经被取消。为了解决这个问题,我们可以让服务器能够感知取消操作。

为此,我们需要检查上下文的 Done 通道。当取消操作传播到服务器时,Done 通道会被关闭,并且在取消示例中,上下文的错误会是 context.Canceled。当我们检测到这个事件时,我们就知道服务器需要返回,并有效地停止处理剩余的请求:

func (s *server) ListTasks(req *pb.ListTasksRequest, stream pb.TodoService_ListTasksServer) error {
    ctx := stream.Context()

    return s.d.getTasks(func(t interface{}) error {
        select {
        case <-ctx.Done():
            switch ctx.Err() {
            case context.Canceled:
                log.Printf("request canceled: %s", ctx.Err())
            default:
            }
            return ctx.Err()
        // TODO: 在生产环境中,将下面的 case 替换为 'default:'。
        case <-time.After(1 * time.Millisecond):
        }

		//...
    })
}

在运行此代码之前,有几点需要注意:

  1. 在处理流时,我们可以通过调用生成的 stream 类型(在我们的例子中是 pb.TodoService_ListTasksServer)中的 Context() 函数来获取上下文。

  2. 请注意,我们故意在每次调用闭包时让服务器休眠 1 毫秒。这在生产环境中不会发生;我们将在 select 中使用 default 分支。这样做是为了给服务器留出时间来注意到取消操作。请注意,这个休眠时间是任意的,最小的时间值是为了让我在我的机器上注意到取消错误。您可能需要更改它,或者将其设为更大的值。

现在,我们可以再次运行服务器:

$ go run ./server 0.0.0.0:50051
listening at 0.0.0.0:50051

然后,我们开始运行客户端:

$ go run ./client 0.0.0.0:50051
//...
CANCEL called
id: 1 description: "A better name for the task" due_date: {}
overdue: true
unexpected error: rpc error: code = Canceled desc = context canceled

最后,您应该还会在服务器端看到以下消息:

request canceled: context canceled

总结一下,我们了解了如何使用 context.WithCancel() 来创建一个可取消的上下文。我们还看到,这个函数返回一个 cancel 函数,我们需要在作用域结束时调用它,以释放与上下文关联的资源,但我们也可以根据某些条件在更早的时候调用它来进行取消。最后,我们看到了如何使服务器能够感知取消操作,从而避免执行不必要的工作。