示例: 并发的非阻塞缓存

本节中我们会做一个无阻塞的缓存,这种工具可以帮助我们来解决现实世界中并发程序出现但没有现成的库可以解决的问题。这个问题叫作缓存(memoizing)函数(译注:Memoization的定义: memoization 一词是 Donald Michie 根据拉丁语 memorandum 杜撰的一个词。相应的动词、过去分词、ing形式有 memoiz、memoized、memoizing ),也就是说,我们需要缓存函数的返回结果,这样在对函数进行调用的时候,我们就只需要一次计算,之后只要返回计算的结果就可以了。我们的解决方案会是并发安全且会避免对整个缓存加锁而导致所有操作都去争一个锁的设计。

我们将使用下面的 httpGetBody 函数作为我们需要缓存的函数的一个样例。这个函数会去进行 HTTP GET 请求并且获取 http 响应 body。对这个函数的调用本身开销是比较大的,所以我们尽量避免在不必要的时候反复调用。

func httpGetBody(url string) (interface{}, error) {
    resp, err := http.Get(url)
    if err != nil {
        return nil, err
    }
    defer resp.Body.Close()
    return ioutil.ReadAll(resp.Body)
}
go

最后一行稍微隐藏了一些细节。ReadAll 会返回两个结果,一个 []byte 数组和一个错误,不过这两个对象可以被赋值给 httpGetBody 的返回声明里的 interface{}error 类型,所以我们也就可以这样返回结果并且不需要额外的工作了。我们在 httpGetBody 中选用这种返回类型是为了使其可以与缓存匹配。

下面是我们要设计的 cache 的第一个“草稿”:

ch9/memo1
Unresolved include directive in modules/ROOT/pages/ch9/ch9-07.adoc - include::example$/ch9/memo1/memo.go[]
go

Memo 实例会记录需要缓存的函数 f(类型为 Func),以及缓存内容(里面是一个 string 到 result 映射的 map)。每一个 result 都是简单的函数返回的值对儿——一个值和一个错误值。继续下去我们会展示一些 Memo 的变种,不过所有的例子都会遵循上面的这些方面。

下面是一个使用 Memo 的例子。对于流入的 URL 的每一个元素我们都会调用 Get,并打印调用延时以及其返回的数据大小的 log:

m := memo.New(httpGetBody)
for url := range incomingURLs() {
    start := time.Now()
    value, err := m.Get(url)
    if err != nil {
        log.Print(err)
    }
    fmt.Printf("%s, %s, %d bytes\n",
    url, time.Since(start), len(value.([]byte)))
}
go

我们可以使用测试包(第11章的主题)来系统地鉴定缓存的效果。从下面的测试输出,我们可以看到URL流包含了一些重复的情况,尽管我们第一次对每一个URL的 (*Memo).Get 的调用都会花上几百毫秒,但第二次就只需要花1毫秒就可以返回完整的数据了。

$ go test.txt -v gopl.io/ch9/memo1
=== RUN   Test
https://golang.org, 175.026418ms, 7537 bytes
https://godoc.org, 172.686825ms, 6878 bytes
https://play.golang.org, 115.762377ms, 5767 bytes
http://gopl.io, 749.887242ms, 2856 bytes
https://golang.org, 721ns, 7537 bytes
https://godoc.org, 152ns, 6878 bytes
https://play.golang.org, 205ns, 5767 bytes
http://gopl.io, 326ns, 2856 bytes
--- PASS: Test (1.21s)
PASS
ok  gopl.io/ch9/memo1   1.257s
bash

这个测试是顺序地去做所有的调用的。

由于这种彼此独立的 HTTP 请求可以很好地并发,我们可以把这个测试改成并发形式。可以使用 sync.WaitGroup 来等待所有的请求都完成之后再返回。

m := memo.New(httpGetBody)
var n sync.WaitGroup
for url := range incomingURLs() {
    n.Add(1)
    go func(url string) {
        start := time.Now()
        value, err := m.Get(url)
        if err != nil {
            log.Print(err)
        }
        fmt.Printf("%s, %s, %d bytes\n",
        url, time.Since(start), len(value.([]byte)))
        n.Done()
    }(url)
}
n.Wait()
go

这次测试跑起来更快了,然而不幸的是貌似这个测试不是每次都能够正常工作。我们注意到有一些意料之外的 cache miss(缓存未命中),或者命中了缓存但却返回了错误的值,或者甚至会直接崩溃。

但更糟糕的是,有时候这个程序还是能正确的运行(译:也就是最让人崩溃的偶发bug),所以我们甚至可能都不会意识到这个程序有 bug。但是我们可以使用 -race 这个 flag 来运行程序,竞争检测器(§9.6)会打印像下面这样的报告:

$ go test.txt -run=TestConcurrent -race -v gopl.io/ch9/memo1
=== RUN   TestConcurrent
...
WARNING: DATA RACE
Write by goroutine 36:
  runtime.mapassign1()
      ~/go/src/runtime/hashmap.go:411 +0x0
  gopl.io/ch9/memo1.(*Memo).Get()
      ~/gobook2/src/gopl.io/ch9/memo1/memo.go:32 +0x205
  ...
Previous write by goroutine 35:
  runtime.mapassign1()
      ~/go/src/runtime/hashmap.go:411 +0x0
  gopl.io/ch9/memo1.(*Memo).Get()
      ~/gobook2/src/gopl.io/ch9/memo1/memo.go:32 +0x205
...
Found 1 data race(s)
FAIL    gopl.io/ch9/memo1   2.393s
bash

memo.go 的 32 行出现了两次,说明有两个 goroutine 在没有同步干预的情况下更新了 cache map。这表明 Get 不是并发安全的,存在数据竞争。

28  func (memo *Memo) Get(key string) (interface{}, error) {
29      res, ok := memo.cache(key)
30      if !ok {
31          res.value, res.err = memo.f(key)
32          memo.cache[key] = res
33      }
34      return res.value, res.err
35  }
go

最简单的使 cache 并发安全的方式是使用基于监控的同步。只要给 Memo 加上一个 mutex ,在 Get 的一开始获取互斥锁,return 的时候释放锁,就可以让 cache 的操作发生在临界区内了:

ch9/memo2
Unresolved include directive in modules/ROOT/pages/ch9/ch9-07.adoc - include::example$/ch9/memo2/memo.go[]
go

测试依然并发进行,但这回竞争检查器“沉默”了。不幸的是对于 Memo 的这一点改变使我们完全丧失了并发的性能优点。每次对 f 的调用期间都会持有锁,Get 将本来可以并行运行的 I/O 操作串行化了。我们本章的目的是完成一个无锁缓存,而不是现在这样的将所有请求串行化的函数的缓存。

下一个 Get 的实现,调用 Get 的 goroutine 会两次获取锁:查找阶段获取一次,如果查找没有返回任何内容,那么进入更新阶段会再次获取。在这两次获取锁的中间阶段,其它 goroutine 可以随意使用 cache。

ch9/memo3
Unresolved include directive in modules/ROOT/pages/ch9/ch9-07.adoc - include::example$/ch9/memo3/memo.go[]
go

这些修改使性能再次得到了提升,但有一些 URL 被获取了两次。这种情况在两个以上的 goroutine 同一时刻调用 Get 来请求同样的 URL 时会发生。多个 goroutine 一起查询cache,发现没有值,然后一起调用 f 这个慢不拉叽的函数。在得到结果后,也都会去更新 map 。其中一个获得的结果会覆盖掉另一个的结果。

理想情况下是应该避免掉多余的工作的。而这种“避免”工作一般被称为 duplicate suppression(重复抑制/避免)。下面版本的 Memo 每一个 map 元素都是指向一个条目的指针。每一个 entry 包含对函数 f 调用结果的内容缓存。与之前不同的是这次 entry 还包含了一个叫 ready 的 channel。在 entry 的 res 字段被设置之后,这个 channel 就会被关闭,以向其它 goroutine 广播(§8.9)去读取该 entry 内的结果是安全的了。

ch9/memo4
Unresolved include directive in modules/ROOT/pages/ch9/ch9-07.adoc - include::example$/ch9/memo4/memo.go[]
go

现在 Get 函数包括下面这些步骤了:获取互斥锁来保护共享变量 cache map,查询 map 中是否存在指定条目,如果没有找到那么分配空间插入一个新条目,释放互斥锁。如果存在条目的话且其值没有写入完成(也就是有其它的 goroutine 在调用 f 这个慢函数)时, goroutine 必须等待值 ready 之后才能读到条目的结果。而想知道是否 ready 的话,可以直接从 ready channel 中读取,由于这个读取操作在 channel 关闭之前一直是阻塞。

如果没有条目的话,需要向 map 中插入一个没有准备好的条目,当前正在调用的 goroutine 就需要负责调用慢函数、更新条目以及向其它所有 goroutine 广播条目已经 ready 可读的消息了。

条目中的 e.res.value 和 e.res.err 变量是在多个 goroutine 之间共享的。创建条目的 goroutine 同时也会设置条目的值,其它 goroutine 在收到 "ready" 的广播消息之后立刻会去读取条目的值。尽管会被多个 goroutine 同时访问,但却并不需要互斥锁。ready channel 的关闭一定会发生在其它 goroutine 接收到广播事件之前,因此第一个 goroutine 对这些变量的写操作是一定发生在这些读操作之前的。不会发生数据竞争。

这样并发、不重复、无阻塞的 cache 就完成了。

上面这样 Memo 的实现使用了一个互斥量来保护多个 goroutine 调用 Get 时的共享 map 变量。不妨把这种设计和前面提到的把 map 变量限制在一个单独的 monitor goroutine 的方案做一些对比,后者在调用 Get 时需要发消息。

Func、result 和 entry 的声明和之前保持一致:

// Func is the type of the function to memoize.
type Func func(key string) (interface{}, error)

// A result is the result of calling a Func.
type result struct {
    value interface{}
    err   error
}

type entry struct {
    res   result
    ready chan struct{} // closed when res is ready
}
go

然而 Memo 类型现在包含了一个叫做 requests 的 channel,Get 的调用方用这个 channel 来和 monitor goroutine 来通信。requests channel 中的元素类型是 request。Get 的调用方会把这个结构中的两组 key 都填充好,实际上用这两个变量来对函数进行缓存的。另一个叫 response 的 channel 会被拿来发送响应结果。这个 channel 只会传回一个单独的值。

ch9/memo5
Unresolved include directive in modules/ROOT/pages/ch9/ch9-07.adoc - include::example$/ch9/memo5/memo.go[]
go

上面的 Get 方法,会创建一个 response channel,把它放进 request 结构中,然后发送给 monitor goroutine,然后马上又会接收它。

cache 变量被限制在了 monitor goroutine (*Memo).server 中,下面会看到。monitor 会在循环中一直读取请求,直到 request channel 被 Close 方法关闭。每一个请求都会去查询 cache,如果没有找到条目的话,那么就会创建/插入一个新的条目。

ch9/memo5
Unresolved include directive in modules/ROOT/pages/ch9/ch9-07.adoc - include::example$/ch9/memo5/memo.go[]
go

和基于互斥量的版本类似,第一个对某个 key 的请求需要负责去调用函数 f 并传入这个 key,将结果存在条目里,并关闭 ready channel 来广播条目的 ready 消息。使用 (*entry).call 来完成上述工作。

紧接着对同一个 key 的请求会发现 map 中已经有了存在的条目,然后会等待结果变为 ready,并将结果从 response 发送给客户端的 goroutine。上述工作是用 (*entry).deliver 来完成的。对 call 和 deliver 方法的调用必须让它们在自己的 goroutine 中进行以确保 monitor goroutines 不会因此而被阻塞住而没法处理新的请求。

这个例子说明我们无论用上锁,还是通信来建立并发程序都是可行的。

上面的两种方案并不好说特定情境下哪种更好,不过了解他们还是有价值的。有时候从一种方式切换到另一种可以使你的代码更为简洁。(译注:不是说好的 golang 推崇通信并发么。)


练习 9.3: 扩展 Func 类型和 (*Memo).Get 方法,支持调用方提供一个可选的 done channel,使其具备通过该 channel 来取消整个操作的能力(§8.9)。一个被取消了的 Func 的调用结果不应该被缓存。