服务器流式 API
在底层协议方面,服务器流式 API 使用客户端的 |
现在,我们已经知道如何注册服务、与 “数据库” 进行交互,并运行客户端和服务器,接下来一切都会变得更快。我们将主要集中在 API 端点本身。在我们的案例中,我们将创建一个 ListTasks
端点,顾名思义,它将列出数据库中所有可用的任务。
为了让这个功能稍微复杂一点,我们将在列出每个任务时,返回该任务是否过期。这主要是为了让你看到如何在响应对象中提供更多关于特定对象的信息。
因此,在 todo.proto
文件中,我们将添加一个名为 ListTasks
的 RPC 端点,它将接收 ListTasksRequest
并返回一个 ListTasksResponse
的流。这就是服务器流式 API。我们收到一个请求并返回零个或多个响应:
message ListTasksRequest {
}
message ListTasksResponse {
Task task = 1;
bool overdue = 2;
}
service TodoService {
//...
rpc ListTasks(ListTasksRequest) returns (stream ListTasksResponse);
}
注意,这次我们发送的是一个空对象作为请求,并返回多个任务以及它们是否过期。我们本可以通过发送任务 ID 的范围来让请求更智能(例如分页),但为了简洁起见,我们选择保持简单。
数据库演进
在能够实现 ListTasks
端点之前,我们需要一种方法来访问 TODO 列表中的所有元素。再次强调,我们不希望将 db
接口与生成的代码耦合在一起,因此我们有几种选择:
-
我们创建一些抽象来遍历任务。对于我们的内存数据库,这可能是可以接受的,但如果是 PostgreSQL 等数据库呢?
-
我们将接口绑定到已有的抽象上,例如数据库的游标。这种方式稍微好一点,但我们仍然会使接口耦合。
-
我们干脆将遍历任务的责任交给
db
接口的实现,然后将一个用户提供的函数应用于所有行。这样,我们就不会与其他组件耦合。
因此,我们决定将遍历任务的责任交给 db
接口的实现。这意味着,我们将向 inMemoryDb
中添加一个新函数,该函数会遍历所有任务,并将一个作为参数传递的函数应用到每个任务上:
type db interface {
// ...
getTasks(f func(interface{}) error) error
}
如你所见,作为参数传递的函数将接收一个 interface{}
类型的参数。虽然这在类型安全性上有些不足,但我们将在后续处理中确保在运行时接收到一个 Task
对象。
然后,对于内存实现,我们有如下内容:
func (d *inMemoryDb) getTasks(f func(interface{}) error) error {
for _, task := range d.tasks {
if err := f(task); err != nil {
return err
}
}
return nil
}
这里有一点需要注意,我们将错误视为致命的。如果用户提供的函数返回错误,getTasks
函数将返回该错误并终止。
至此,数据库部分完成了。我们现在可以从数据库中获取所有任务并应用某些逻辑。接下来,我们将实现 ListTasks
端点。
实现 ListTasks
为了实现该端点,我们将从 proto
文件生成 Go 代码,并获取我们需要实现的函数签名。我们只需运行以下命令:
$ protoc --go_out=. \
--go_opt=paths=source_relative \
--go-grpc_out=. \
--go-grpc_opt=paths=source_relative \
proto/todo/v1/*.proto
查看 proto/todo/v1/todo_grpc.pb.go
,我们会看到 TodoServiceServer
接口中增加了一个函数:
type TodoServiceServer interface {
// ...
ListTasks(*ListTasksRequest, TodoService_ListTasksServer) error
// ...
}
如上所示,签名与我们之前的 AddTask
函数稍有不同。我们现在只返回一个错误或 nil
,并且有一个 TodoService_ListTasksServer
类型的参数。
深入研究生成的代码,我们可以看到 TodoService_ListTasksServer
被定义为以下形式:
type TodoService_ListTasksServer interface {
Send(*ListTasksResponse) error
grpc.ServerStream
}
这实际上是一个流,可以用来发送多个 ListTasksResponse
对象。
了解了这些信息后,我们就可以在代码中实现这个函数了。我们可以打开 server/impl.go
文件,并将 ListTasks
函数的签名复制粘贴到 TodoServiceServer
接口中:
func (s *server) ListTasks(req *pb.ListTasksRequest, stream pb.TodoService_ListTasksServer) error {
}
显然,我们添加了服务器类型以指定我们正在为服务器实现 ListTasks
,并命名了参数。req
是从客户端接收到的请求,stream
是我们将用来发送多个响应的对象。
然后,我们函数的逻辑再次非常简单。我们将遍历所有任务,确保我们处理的是 Task
对象,对于每个任务,我们将通过检查这些任务是否已完成(对于已完成的任务没有逾期)以及 due_date
字段是否早于当前时间来 “计算” 逾期情况。总之,我们将创建一个包含这些信息的 ListTasksResponse
,并将其发送给客户端(server/impl.go
)。
func (s *server) ListTasks(req *pb.ListTasksRequest, stream pb.TodoService_ListTasksServer) error {
return s.d.getTasks(func(t interface{}) error {
task := t.(*pb.Task)
overdue := task.DueDate != nil && !task.Done &&
task.DueDate.AsTime().Before(time.Now().UTC())
err := stream.Send(&pb.ListTasksResponse{
Task: task,
Overdue: overdue,
})
return err
})
}
需要注意的是,AsTime
函数将返回一个 UTC 时区的时间,因此在比较时需要确保另一个时间也处于 UTC 时区。这就是为什么我们使用 time.Now().UTC()
而不是 time.Now()
的原因。
显然,这个函数有可能失败(例如,如果名为 t
的变量不是 Task
类型怎么办?),但现在我们先不太担心错误处理。我们稍后会讨论这个问题。现在,让我们从客户端调用这个端点。
从客户端调用 ListTasks
要从客户端调用 ListTasks
API 端点,我们需要了解如何消费一个服务器流式 RPC 端点。为此,我们查看方法签名或在接口名 TodoServiceClient
中生成的函数。它应该看起来像以下这样:
ListTasks(ctx context.Context, in *ListTasksRequest, opts ...grpc.CallOption) (TodoService_ListTasksClient, error)
我们可以看到,我们需要传递一个上下文和一个请求,并且有一些可选的调用选项。然后,我们还可以看到,我们将获得一个 TodoService_ListTasksClient
或一个错误。
TodoService_ListTasksClient
类型与我们在 ListTasks
端点中处理的流非常相似。主要区别是,代替 Send
函数,我们现在有一个名为 Recv
的函数。以下是 TodoService_ListTasksClient
的定义:
type TodoService_ListTasksClient interface {
Recv() (*ListTasksResponse, error)
grpc.ClientStream
}
因此,我们将如何处理这个流呢?我们将循环调用 Recv
,获取所有响应,直到某一时刻,服务器会告诉我们:“我完成了。” 这通常会发生在我们收到等于 io.EOF
的错误时。
我们可以在 client/main.go
中创建一个名为 printTasks
的函数,它将反复调用 Recv
,并检查我们是否完成或发生了错误。如果没有错误,它会打印出任务对象的字符串表示:
func printTasks(c pb.TodoServiceClient) {
req := &pb.ListTasksRequest{}
stream, err := c.ListTasks(context.Background(), req)
if err != nil {
log.Fatalf("unexpected error: %v", err)
}
for {
res, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("unexpected error: %v", err)
}
fmt.Println(res.Task.String(), "overdue: ", res.Overdue)
}
}
有了这个函数后,我们可以在 main
函数中的 addTask
调用之后调用它:
fmt.Println("--------ADD--------")
//...
fmt.Println("--------LIST-------")
printTasks(c)
fmt.Println("-------------------")
现在,我们可以使用 go run
来先运行服务器,再运行客户端。因此,在项目根目录下,我们可以运行以下命令:
$ go run ./server 0.0.0.0:50051
listening at 0.0.0.0:50051
然后运行我们的客户端:
$ go run ./client 0.0.0.0:50051
//...
--------LIST-------
id:1 description:"This is a task" due_date: {seconds:1680158076 nanos:574914000} overdue: false
-------------------
这如预期那样工作。现在,我建议你自己尝试添加更多的任务,使用不同的值,并在添加所有任务后使用 printTasks
函数。这应该有助于你熟悉这个 API。
现在我们可以添加任务并列出所有任务了,但如果我们能够更新已存在的任务,标记任务为已完成或更改到期日期,那会很有用。我们将通过客户端流式 API 来测试这一点。