基础教程

这是一个关于如何在 Go 中使用 gRPC 的基础教程。

通过这个教程,你将学习如何:

  • .proto 文件中定义服务。

  • 使用协议缓冲编译器生成服务器和客户端代码。

  • 使用 Go gRPC API 编写简单的客户端和服务器。

假设你已经阅读了 gRPC 介绍 并且熟悉 协议缓冲。需要注意的是,本教程使用的是 proto3 版本的协议缓冲语言:你可以在 proto3 语言指南Go 生成的代码指南 中找到更多信息。

为什么使用 gRPC?

我们的示例是一个简单的路线映射应用程序,允许客户端获取有关其路线上的特征信息,创建路线的摘要,并与服务器和其他客户端交换路线信息,例如交通更新。

使用 gRPC,我们可以在 .proto 文件中定义服务一次,并生成支持语言中的客户端和服务器代码,这些代码可以在从大数据中心的服务器到你自己的平板电脑等各种环境中运行——gRPC 为你处理了不同语言和环境之间通信的所有复杂性。我们还可以获得使用协议缓冲的所有优势,包括高效的序列化、简单的 IDL 以及易于更新接口。

设置

你应该已经安装了生成客户端和服务器接口代码所需的工具——如果没有,请查看 快速开始 中的 先决条件 部分。

获取示例代码

示例代码位于 grpc-go 仓库中。

  1. 你可以 将仓库下载为 ZIP 文件 并解压,或者克隆仓库:

    git clone -b v1.69.2 --depth 1 https://github.com/grpc/grpc-go
  2. 然后切换到示例目录:

    cd grpc-go/examples/route_guide

定义服务

我们的第一步(就像你在 gRPC 介绍 中知道的那样)是使用 协议缓冲 定义 gRPC 服务以及方法请求和响应类型。完整的 .proto 文件请参见 routeguide/route_guide.proto

要定义一个服务,你需要在 .proto 文件中指定一个命名的 service

service RouteGuide {
   ...
}

然后你在服务定义中定义 rpc 方法,指定它们的请求和响应类型。gRPC 允许你定义四种类型的服务方法,所有这些方法都在 RouteGuide 服务中使用:

  • 简单 RPC,客户端发送请求到服务器,并等待响应返回,像普通的函数调用一样。

    // 获取给定位置的特征。
    rpc GetFeature(Point) returns (Feature) {}
  • 服务器端流式 RPC,客户端发送请求到服务器,获取一个流来读取一系列消息,客户端从返回的流中读取消息直到没有更多的消息。你通过将 stream 关键字放在响应类型前面来指定服务器端流式方法。

    // 获取给定矩形区域内的特征。结果是流式返回的,而不是一次性返回(例如在响应消息中使用重复字段),因为矩形可能覆盖一个大区域,包含大量特征。
    rpc ListFeatures(Rectangle) returns (stream Feature) {}
  • 客户端流式 RPC,客户端写入一系列消息并将它们发送到服务器,客户端完成写入后,等待服务器读取完所有消息并返回响应。你通过将 stream 关键字放在请求类型前面来指定客户端流式方法。

    // Accepts a stream of Points on a route being traversed, returning a
    // RouteSummary when traversal is completed.
    rpc RecordRoute(stream Point) returns (RouteSummary) {}
  • 双向流式 RPC,客户端和服务器都通过读写流发送一系列消息。两条流是独立操作的,因此客户端和服务器可以按照它们喜欢的顺序进行读写。例如,服务器可以在收到所有客户端消息后再写回应,或者可以交替进行读写,或者以其他顺序进行。你通过将 stream 关键字放在请求和响应类型前面来指定这种类型的方法。

    // 接受一个在遍历路线上发送的 RouteNote 流,同时接收其他 RouteNote(例如来自其他用户)。
    rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}

我们的 .proto 文件还包含所有请求和响应类型的协议缓冲消息类型定义——例如,下面是 Point 消息类型:

// 点是用纬度-经度对表示的,使用 E7 表示法(度乘以 10**7 并四舍五入到最近的整数)。
// 纬度应在 +/- 90 度范围内,经度应在 +/- 180 度范围内(包含)。
message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}

生成客户端和服务器代码

接下来,我们需要从 .proto 服务定义中生成 gRPC 客户端和服务器接口。这是通过协议缓冲编译器 protoc 和一个特殊的 gRPC Go 插件来完成的。这类似于我们在 快速开始 中做的那样。

examples/route_guide 目录下,运行以下命令:

protoc --go_out=. --go_opt=paths=source_relative \
    --go-grpc_out=. --go-grpc_opt=paths=source_relative \
    routeguide/route_guide.proto

运行此命令会在 routeguide 目录下生成以下文件:

  • route_guide.pb.go:包含填充、序列化和检索请求和响应消息类型的所有协议缓冲代码。

  • route_guide_grpc.pb.go:包含以下内容:

    • 一个接口类型(或存根),客户端可以通过该接口调用 RouteGuide 服务中定义的方法。

    • 服务器实现的接口类型,包含 RouteGuide 服务中定义的方法。

创建服务器

首先,让我们看看如何创建一个 RouteGuide 服务器。如果你只对创建 gRPC 客户端感兴趣,可以跳过这一部分,直接进入 创建客户端(尽管你可能会觉得它也很有趣!)。

创建 RouteGuide 服务有两个部分:

  • 实现从服务定义生成的服务接口:执行服务的实际 “工作”。

  • 运行一个 gRPC 服务器来监听来自客户端的请求,并将它们分发到正确的服务实现。

你可以在 server/server.go 中找到我们的示例 RouteGuide 服务器。让我们深入了解它是如何工作的。

实现 RouteGuide

如你所见,我们的服务器有一个 routeGuideServer 结构体类型,它实现了生成的 RouteGuideServer 接口:

type routeGuideServer struct {
        ...
}

func (s *routeGuideServer) GetFeature(ctx context.Context, point *pb.Point) (*pb.Feature, error) {
        ...
}

func (s *routeGuideServer) ListFeatures(rect *pb.Rectangle, stream pb.RouteGuide_ListFeaturesServer) error {
        ...
}

func (s *routeGuideServer) RecordRoute(stream pb.RouteGuide_RecordRouteServer) error {
        ...
}

func (s *routeGuideServer) RouteChat(stream pb.RouteGuide_RouteChatServer) error {
        ...
}

简单 RPC

routeGuideServer 实现了我们所有的服务方法。我们首先看一下最简单的类型 GetFeature,它仅仅从客户端获取一个 Point,并返回相应的 Feature 信息。

func (s *routeGuideServer) GetFeature(ctx context.Context, point *pb.Point) (*pb.Feature, error) {
  for _, feature := range s.savedFeatures {
    if proto.Equal(feature.Location, point) {
      return feature, nil
    }
  }
  // 没有找到特征,返回一个没有命名的特征
  return &pb.Feature{Location: point}, nil
}

该方法接收一个用于RPC的上下文对象以及客户端的 Point 协议缓冲请求。它返回一个 Feature 协议缓冲对象作为响应信息,以及一个 error。在该方法中,我们用适当的信息填充 Feature,然后返回它并且错误为 nil,告诉 gRPC 我们已经完成了 RPC 处理,并且 Feature 可以返回给客户端。

服务器端流式RPC

现在,让我们来看一下我们的一个流式 RPC。ListFeatures 是一个服务器端流式 RPC,因此我们需要向客户端返回多个 Feature

func (s *routeGuideServer) ListFeatures(rect *pb.Rectangle, stream pb.RouteGuide_ListFeaturesServer) error {
  for _, feature := range s.savedFeatures {
    if inRange(feature.Location, rect) {
      if err := stream.Send(feature); err != nil {
        return err
      }
    }
  }
  return nil
}

正如你所看到的,与我们方法参数中简单的请求和响应对象不同,这次我们获得了一个请求对象(客户端希望在其中查找 Features 的 Rectangle)以及一个特殊的 RouteGuide_ListFeaturesServer 对象,用来写入我们的响应。

在方法中,我们根据需要填充多个 Feature 对象,并使用 RouteGuide_ListFeaturesServerSend() 方法将它们发送出去。最后,和简单RPC一样,我们返回一个 nil 错误,告诉 gRPC 我们已经完成了响应的写入。如果在这个调用中发生任何错误,我们会返回一个非 nil 的错误;gRPC 层会将其转换为适当的RPC状态,并发送到网络中。

客户端流式 RPC

现在,让我们看看一个稍微复杂一点的情况:客户端流式方法 RecordRoute,在这里我们从客户端接收一系列的 Point,并返回一个包含旅行信息的单一 RouteSummary。正如你所看到的,这次方法根本没有请求参数。相反,它获取一个 RouteGuide_RecordRouteServer 流,服务器可以用它来读取和写入消息——它可以使用 Recv() 方法接收客户端的消息,并通过 SendAndClose() 方法返回一个单一的响应。

func (s *routeGuideServer) RecordRoute(stream pb.RouteGuide_RecordRouteServer) error {
  var pointCount, featureCount, distance int32
  var lastPoint *pb.Point
  startTime := time.Now()
  for {
    point, err := stream.Recv()
    if err == io.EOF {
      endTime := time.Now()
      return stream.SendAndClose(&pb.RouteSummary{
        PointCount:   pointCount,
        FeatureCount: featureCount,
        Distance:     distance,
        ElapsedTime:  int32(endTime.Sub(startTime).Seconds()),
      })
    }
    if err != nil {
      return err
    }
    pointCount++
    for _, feature := range s.savedFeatures {
      if proto.Equal(feature.Location, point) {
        featureCount++
      }
    }
    if lastPoint != nil {
      distance += calcDistance(lastPoint, point)
    }
    lastPoint = point
  }
}

在方法体内,我们使用 RouteGuide_RecordRouteServerRecv() 方法重复读取客户端的请求到请求对象(在这个例子中是 Point),直到没有更多的消息为止:服务器需要在每次调用后检查 Recv() 返回的错误。如果是 nil,表示流仍然有效,服务器可以继续读取;如果是 io.EOF,表示消息流已经结束,服务器可以返回其 RouteSummary。如果是其他值,我们将直接返回该错误,以便 gRPC 层将其转换为适当的 RPC 状态。

双向流式RPC

最后,让我们看看我们的双向流式RPC RouteChat()

func (s *routeGuideServer) RouteChat(stream pb.RouteGuide_RouteChatServer) error {
  for {
    in, err := stream.Recv()
    if err == io.EOF {
      return nil
    }
    if err != nil {
      return err
    }
    key := serialize(in.Location)
                ... // look for notes to be sent to client
    for _, note := range s.routeNotes[key] {
      if err := stream.Send(note); err != nil {
        return err
      }
    }
  }
}

这次我们得到一个 RouteGuide_RouteChatServer 流,正如我们在客户端流式示例中一样,可以用来读取和写入消息。然而,这次我们在客户端仍然向其消息流写入消息的同时,通过方法的流返回值。

在这里,读取和写入的语法与我们的客户端流式方法非常相似,不同之处在于服务器使用流的 Send() 方法,而不是 SendAndClose(),因为它正在写入多个响应。尽管每一方总是按照写入的顺序接收到对方的消息,但客户端和服务器都可以按任意顺序进行读写——这些流是完全独立操作的。

启动服务器

实现了所有方法之后,我们还需要启动一个 gRPC 服务器,这样客户端才能实际使用我们的服务。以下是我们如何启动 RouteGuide 服务的代码:

lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", port))
if err != nil {
  log.Fatalf("failed to listen: %v", err)
}
var opts []grpc.ServerOption
...
grpcServer := grpc.NewServer(opts...)
pb.RegisterRouteGuideServer(grpcServer, newServer())
grpcServer.Serve(lis)

要构建并启动服务器,我们:

  1. 使用以下代码指定要监听的端口:

    lis, err := net.Listen(...)
  2. 使用 grpc.NewServer(…​) 创建一个 gRPC 服务器实例。

  3. 在 gRPC 服务器上注册我们的服务实现。

  4. 使用端口详情调用服务器的 Serve() 方法,以进行阻塞等待,直到进程被终止或调用 Stop()

创建客户端

在这一部分,我们将看看如何为我们的 RouteGuide 服务创建一个 Go 客户端。完整的客户端代码请参见 grpc-go/examples/route_guide/client/client.go

创建存根

要调用服务方法,我们首先需要创建一个 gRPC 客户端存根,它将为我们提供一个与服务器的连接,允许我们调用服务方法。

var opts []grpc.DialOption
...
conn, err := grpc.NewClient(*serverAddr, opts...)
if err != nil {
  ...
}
defer conn.Close()

调用服务

要调用服务方法,首先需要创建一个 gRPC 通道与服务器通信。我们通过将服务器地址和端口号传递给 grpc.NewClient() 来创建它,如下所示:

var opts []grpc.DialOption
...
conn, err := grpc.NewClient(*serverAddr, opts...)
if err != nil {
  ...
}
defer conn.Close()

您可以使用 DialOptionsgrpc.NewClient 中设置身份验证凭据(例如,TLS、GCE 凭据或 JWT 凭据),当服务需要这些凭据时。RouteGuide 服务不需要任何凭据。

一旦 gRPC 通道设置好,我们就需要一个客户端存根来执行 RPC。我们通过 pb 包中提供的 NewRouteGuideClient 方法来获取它,该方法是从示例 .proto 文件生成的。

client := pb.NewRouteGuideClient(conn)

调用服务方法

现在让我们看看如何调用我们的服务方法。请注意,在 gRPC-Go 中,RPC 以阻塞/同步模式运行,这意味着 RPC 调用会等待服务器的响应,并且会返回响应或错误。

简单 RPC

调用简单的 RPC GetFeature 几乎就像调用一个本地方法一样直接。

feature, err := client.GetFeature(context.Background(), &pb.Point{409146138, -746188906})
if err != nil {
  ...
}

如你所见,我们在之前获取的存根上调用该方法。在方法参数中,我们创建并填充一个请求协议缓冲区对象(在我们的例子中是 Point)。我们还传递一个 context.Context 对象,它让我们在必要时改变 RPC 的行为,比如设置超时或取消正在进行的 RPC。如果调用没有返回错误,我们就可以从第一个返回值中读取从服务器收到的响应信息。

log.Println(feature)

服务器端流 RPC

在这里我们调用了服务器端流式方法 ListFeatures,它返回一系列地理 特征。如果你已经阅读过 创建服务器,那么其中的一些内容可能会非常熟悉——流式 RPC 在客户端和服务器端的实现方式是类似的。

rect := &pb.Rectangle{ ... }  // initialize a pb.Rectangle
stream, err := client.ListFeatures(context.Background(), rect)
if err != nil {
  ...
}
for {
    feature, err := stream.Recv()
    if err == io.EOF {
        break
    }
    if err != nil {
        log.Fatalf("%v.ListFeatures(_) = _, %v", client, err)
    }
    log.Println(feature)
}

与简单 RPC 一样,我们将方法传递一个上下文和一个请求。然而,返回的不是一个响应对象,而是一个 RouteGuide_ListFeaturesClient 实例。客户端可以使用 RouteGuide_ListFeaturesClient 流来读取服务器的响应。

我们使用 RouteGuide_ListFeaturesClientRecv() 方法重复读取服务器的响应到一个响应协议缓冲对象(在这个例子中是 Feature),直到没有更多的消息:客户端需要在每次调用 Recv() 后检查返回的错误 err。如果是 nil,流仍然有效,可以继续读取;如果是 io.EOF,则消息流已结束;否则,必须是 RPC 错误,错误信息通过 err 传递。

客户端流 RPC

客户端流式方法 RecordRoute 与服务器端的方法类似,不同之处在于我们只传递给方法一个上下文,并返回一个 RouteGuide_RecordRouteClient 流,客户端可以使用该流来同时写入和读取消息。

// Create a random number of random points
r := rand.New(rand.NewSource(time.Now().UnixNano()))
pointCount := int(r.Int31n(100)) + 2 // Traverse at least two points
var points []*pb.Point
for i := 0; i < pointCount; i++ {
  points = append(points, randomPoint(r))
}
log.Printf("Traversing %d points.", len(points))
stream, err := client.RecordRoute(context.Background())
if err != nil {
  log.Fatalf("%v.RecordRoute(_) = _, %v", client, err)
}
for _, point := range points {
  if err := stream.Send(point); err != nil {
    log.Fatalf("%v.Send(%v) = %v", stream, point, err)
  }
}
reply, err := stream.CloseAndRecv()
if err != nil {
  log.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil)
}
log.Printf("Route summary: %v", reply)

RouteGuide_RecordRouteClient 有一个 Send() 方法,我们可以用它向服务器发送请求。一旦我们通过 Send() 将客户端的请求写入流中,就需要调用流的 CloseAndRecv() 方法,以通知 gRPC 我们已经完成写入并且期待接收响应。我们通过 CloseAndRecv() 返回的 err 获取 RPC 状态。如果状态为 nil,那么 CloseAndRecv() 的第一个返回值将是有效的服务器响应。

双端流 PRC

最后,让我们来看一下双向流式 RPC RouteChat()。与 RecordRoute 的情况一样,我们只需要传递一个上下文对象给方法,并且返回一个我们可以用来同时读写消息的流。然而,这次我们通过方法的流返回值,同时服务器仍在向它们的消息流写入消息。

stream, err := client.RouteChat(context.Background())
waitc := make(chan struct{})
go func() {
  for {
    in, err := stream.Recv()
    if err == io.EOF {
      // read done.
      close(waitc)
      return
    }
    if err != nil {
      log.Fatalf("Failed to receive a note : %v", err)
    }
    log.Printf("Got message %s at point(%d, %d)", in.Message, in.Location.Latitude, in.Location.Longitude)
  }
}()
for _, note := range notes {
  if err := stream.Send(note); err != nil {
    log.Fatalf("Failed to send a note: %v", err)
  }
}
stream.CloseSend()
<-waitc

这里的读写语法与我们的客户端流式方法非常相似,不同之处在于我们在完成调用后使用流的 CloseSend() 方法。尽管每一方始终会按写入顺序接收到对方的消息,但客户端和服务器都可以以任何顺序读写消息——流是完全独立操作的。

试用

examples/route_guide 目录执行以下命令:

  1. 运行服务器:

    $ go run server/server.go
  2. 从另一个终端运行客户端

    $ go run client/client.go

您将看到这样的输出:

Getting feature for point (409146138, -746188906)
name:"Berkshire Valley Management Area Trail, Jefferson, NJ, USA" location:<latitude:409146138 longitude:-746188906 >
Getting feature for point (0, 0)
location:<>
Looking for features within lo:<latitude:400000000 longitude:-750000000 > hi:<latitude:420000000 longitude:-730000000 >
name:"Patriots Path, Mendham, NJ 07945, USA" location:<latitude:407838351 longitude:-746143763 >
...
name:"3 Hasta Way, Newton, NJ 07860, USA" location:<latitude:410248224 longitude:-747127767 >
Traversing 56 points.
Route summary: point_count:56 distance:497013163
Got message First message at point(0, 1)
Got message Second message at point(0, 2)
Got message Third message at point(0, 3)
Got message First message at point(0, 1)
Got message Fourth message at point(0, 1)
Got message Second message at point(0, 2)
Got message Fifth message at point(0, 2)
Got message Third message at point(0, 3)
Got message Sixth message at point(0, 3)

我们省略了本页所示的客户机和服务器跟踪输出中的时间戳。