应用并发示例

到目前为止,我们已经了解了 goroutine 和通道的主要操作和行为。这两种并发机制非常重要,因为它们是 Go 实现并发的关键。然而,Go 标准库还在其 sync 包中提供了并发原语。它包含各种用途的同步原语:

  • sync.Map 是一个并发安全的映射实现。我们将在下一节中探讨如何创建其他线程安全的数据结构。

  • sync.Mutex 是一个排他锁。它允许我们控制资源,一次只能由一个 goroutine 使用。根据解决的问题,还可以使用只读或读写互斥锁。

  • sync.Once 是一个只能获取一次的特殊锁。这对于包装诸如清理代码之类的语句非常有用,这些代码应该只运行一次。

  • sync.Pool 是一个临时对象集合,可以单独保存和检索。它可以被视为对象缓存,使得创建线程安全的列表变得容易。

  • sync.WaitGroup 用于等待一组 goroutine 完成。该原语内部有一个计数器和一个锁,使其能够跟踪在完成之前需要等待多少个 goroutine。这可以大大简化主 goroutine

您可以在官方 Go 文档( https://pkg.go.dev/sync )中阅读 sync 包中同步原语的完整文档。这些设计良好的同步原语为我们提供了解决多种类型问题的工具。让我们在接下来的章节中看看其中一些的实际应用。

只关闭一次

正如我们在图9.5 中看到的,如果我们尝试多次关闭通道,通道会引发 panic。这是使用 sync.Once 的一个很好的候选场景,尽管我们可以想象这种机制的其他重要用途,例如实现单例模式或执行清理函数。

这种专用锁很容易使用,以确保通道只关闭一次:

func safelyClose(once *sync.Once, ch chan struct{}) {
    fmt.Println("Hello, friend!")
    once.Do(func() {
        fmt.Println("Channel closed.")
        close(ch)
    })
}

func main() {
    var once sync.Once
    ch := make(chan struct{})
    for i := 0; i < 3; i++ {
        go safelyClose(&once, ch)
    }
    <-ch
    fmt.Println("Goodbye, friend!")
}

我们通过将关闭操作包装在 sync.Once 中来实现通道的安全关闭:

  1. 我们创建了 safelyClose 函数,该函数接收一个指向 sync.Once 类型的指针和由 main 函数创建的通道。请注意,与通道类型不同,我们需要使用显式参数指针类型传递 Once 类型。

  2. safelyClose 函数内部,我们在 once.Do 方法中调用通道的关闭操作。Do 方法接收一个函数作为参数,因此我们将语句包装在一个匿名函数中。

  3. main 函数内部,我们创建了一个零值的 sync.Once 实例。同步原语不需要特殊的初始化,因此零值可以直接使用。

  4. 我们使用 for 循环创建多个执行 safelyClose 函数的 goroutine。这些 goroutine 共享相同的 once 和通道实例。

  5. 最后,我们通过从通道接收操作来阻塞主 goroutine。一旦第一个 goroutine 关闭通道,该操作将完成。

运行示例程序显示启动了多个 goroutine,但通道只关闭一次:

$ go run chapter09/concurrency/once/main.go
Hello, friend!
Channel closed.
Hello, friend!
Goodbye, friend!
Hello, friend!

从输出中可以看出,启动了多个 goroutine,但通道只关闭了一次。sync.Once 使用简单,但它可以帮助我们在只应执行一次的操作(例如关闭通道)周围构建安全性。

线程安全的数据结构

工程师经常解决的另一个问题是构建线程安全的数据结构。这些类型的数据结构可以安全地被多个 goroutine 读取和写入。默认情况下,Go 的切片和映射不是并发安全的,因此我们需要注意多个 goroutine 访问共享数据结构和资源的情况。这也是为什么使用通道(它们是线程安全的)进行通信比通过共享内存(由数据结构或变量表示)进行通信更受青睐的原因之一。

sync.Maphttps://pkg.go.dev/sync#Map )是一个线程安全的映射实现。该映射在底层使用锁,因此它的性能不如内置的映射类型。同步映射暴露了提供读写功能的方法:

const workerCount = 3

func greet(id int, smap *sync.Map, done chan struct{}) {
    g := fmt.Sprintf("Hello, friend! I'm Goroutine %d.", id)
    smap.Store(id, g)
    done <- struct{}{}
}

func main() {
    var smap sync.Map
    done := make(chan struct{})
    for i := 0; i < workerCount; i++ {
        go greet(i, &smap, done)
    }
    for i := 0; i < workerCount; i++ {
        <-done
    }
    smap.Range(func(key, value any) bool {
        fmt.Println(value)
        return true
    })
    fmt.Println("Goodbye, friend!")
}

我们通过包装方法与同步映射进行交互:

  1. 我们在程序顶部声明了 workerCount 常量,它表示我们将启动的 goroutine 数量。

  2. greet 函数接收三个参数:一个 ID、一个指向 sync.Map 的指针以及一个用于通知主 goroutine 工作完成的通道。我们格式化一个问候字符串,使用传入的 ID,然后使用 Store 方法将其保存到映射中,并向 done 通道写入一个值,以通知主 goroutine 该工作 goroutine 已完成。

  3. main 函数内部,我们初始化了映射。与之前看到的 sync.Once 一样,该映射的零值可以直接使用。我们还初始化了一个通道,用于通知主 goroutine 工作 goroutine 已完成。

  4. 然后,我们运行两个 for 循环。第一个循环在各自的 goroutine 中启动 greet 函数,而第二个循环等待从 done 通道接收值。这使我们能够等待所有 goroutine 完成后再继续。

  5. 最后,我们使用 Range 方法读取映射中包含的所有值,该方法接收一个匿名函数作为参数。我们打印条目并返回 true,这将允许 Range 方法继续循环。

该程序的输出显示问候语可以并发地保存和检索:

$ go run chapter09/concurrency/syncmap/main.go
Hello, friend! I'm Goroutine 2.
Hello, friend! I'm Goroutine 0.
Hello, friend! I'm Goroutine 1.
Goodbye, friend!

内置的映射类型在多个 goroutine 写入时会引发 panic,因此在这种情况下应确保使用同步映射。

类似于 sync.Map 的方法,我们可以使用 sync.Mutex 锁来创建自己的线程安全自定义数据结构,以限制对底层数据结构的访问。例如,我们可以通过以下方法创建一个线程安全的 后进先出(LIFO)栈

// 线程安全的 LIFO 栈实现
type Stack struct {
    lock sync.Mutex
    data []string
}

// Push 将给定元素添加到列表末尾
func (s *Stack) Push(el string) {
    defer s.lock.Unlock()
    s.lock.Lock()
    s.data = append(s.data, el)
}

// Pop 移除并返回列表中的最后一个元素,
// 如果列表为空,则返回错误。
func (s *Stack) Pop() (*string, error) {
    defer s.lock.Unlock()
    s.lock.Lock()
    if len(s.data) == 0 {
        return nil, fmt.Errorf("stack is empty")
    }
    last := s.data[len(s.data)-1]
    s.data = s.data[0 : len(s.data)-1]
    return &last, nil
}

栈实现使用了 sync.Mutex,它暴露了 LockUnlock 两个方法,以限制对底层数据切片的访问:

  1. 自定义的 Stack 结构有两个字段:一个锁和一个数据切片。这些是未导出的字段,因为它们应该仅由栈数据结构本身管理。

  2. Stack 有两个方法。Push 将元素添加到数据切片的末尾,而 Pop 从数据切片中移除最后一个元素并返回它。如果切片为空,则 Pop 方法将返回错误。

  3. 两个函数都使用 sync.Mutex 类型的锁来确保这两个方法一次只能由一个 goroutine 调用。我们使用 defer 关键字来确保无论方法通过哪个执行路径,锁都会被释放。

sync.Mutex 是一种多功能的锁定机制,可用于阻止访问任何访问共享资源或需要唯一控制资源的代码段。这被称为 关键代码段

类似地,sync 包还提供了 sync.RWMutex,它提供了分别控制读取和写入锁的能力。这种控制级别可能对创建由许多 goroutine 使用的线程安全数据结构很有用。

等待完成

在本节中,我们将探讨的最后一个同步原语是 sync.WaitGroup。在底层,WaitGroup 管理一个内部计数器,该计数器维护还有多少资源需要完成。这种专用锁允许我们等待多个 goroutine 完成,从而使我们能够简化上一节中的同步映射示例:

const workerCount = 3

func greet(id int, smap *sync.Map, wg *sync.WaitGroup) {
    defer wg.Done()
    g := fmt.Sprintf("Hello, friend! I'm Goroutine %d.", id)
    smap.Store(id, g)
}

func main() {
    var smap sync.Map
    var wg sync.WaitGroup
    wg.Add(workerCount)
    for i := 0; i < workerCount; i++ {
        go greet(i, &smap, &wg)
    }
    wg.Wait()
    smap.Range(func(key, value any) bool {
        fmt.Println(value)
        return true
    })
    fmt.Println("Goodbye, friend!")
}

我们进行了一些关键更改,大大简化了解决方案:

  1. greet 函数接收一个指向 sync.WaitGroup 的指针,而不是 done 通道。在函数顶部,我们使用 defer 调用 WaitGroupDone 方法,该方法将其内部计数器减 1,表示该 goroutine 已完成。

  2. main 函数内部,我们初始化了 sync.WaitGroup,它已准备好使用。我们将 workerCount 添加到内部计数器中,向它发出信号,表明我们将启动多少个 goroutine。WaitGroup 将阻塞,直到此内部计数器达到 0,这将在每个子 goroutine 完成时调用一次 Done 方法时发生。

  3. 最后,我们在 main 函数中调用 Wait 方法。该方法将阻塞,直到 WaitGroup 的内部计数器达到 0。这消除了在 for 循环中为每个完成的 goroutine 从通道读取消息的需要。

至此,我们结束了对 Go 并发基础知识和应用的探索。正如我们所看到的,Go 并发利用了 goroutine、通道和同步原语。我们可以轻松地使用锁来创建线程安全的数据结构,并确保关键代码段一次只能由一个 goroutine 访问。在下一节中,我们将学习并发引入的新问题给我们的程序带来了哪些挑战。