简介

sync包提供了基本的同步基元,如互斥锁。除了Once和WaitGroup类型,大部分都是适用于低水平程序线程,高水平的同步使用channel通信更好一些。

本包的类型的值不应被拷贝。

主要类型有:Locker, Cond, Map, Mutex, Once, Pool,RWMutex, WaitGroup

Locker

Locker接口代表一个可以加锁和解锁的对象。

1
2
3
4
5
// A Locker represents an object that can be locked and unlocked.
type Locker interface {
Lock()
Unlock()
}

Once

Once是只执行一次动作的对象。

Once 是一个可以被多次调用但是只执行一次,若每次调用Do时传入参数f不同,但是只有第一个才会被执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
var once sync.Once
onceBody := func() {
fmt.Println("Only once")
}
done := make(chan bool)
for i := 0; i < 10; i++ {
go func() {
once.Do(onceBody)
done <- true
}()
}
for i := 0; i < 10; i++ {
<-done
}

// 如果你执行这段代码会发现,虽然调用了10次,但是只执行了1次。
// Output:
Only once

Do方法

1
func (o *Once) Do(f func())

Do方法当且仅当第一次被调用时才执行函数f。换句话说,给定变量:

1
var once Once

如果once.Do(f)被多次调用,只有第一次调用会执行f,即使f每次调用Do 提供的f值不同。需要给每个要执行仅一次的函数都建立一个Once类型的实例。

Do用于必须刚好运行一次的初始化。因为f是没有参数的,因此可能需要使用闭包来提供给Do方法调用:

1
config.once.Do(func() { config.init(filename) })

因为只有f返回后Do方法才会返回,f若引起了Do的调用,会导致死锁。

互斥锁

type Mutex

1
2
3
type Mutex struct {
// 包含隐藏或非导出字段
}

Mutex是一个互斥锁,可以创建为其他结构体的字段;零值为解锁状态。Mutex类型的锁和线程无关,可以由不同的线程加锁和解锁。 第一次使用后不得复制 Mutex

func (*Mutex) Lock

1
func (m *Mutex) Lock()

Lock方法锁住m,如果m已经加锁,则阻塞直到m解锁。

func (*Mutex) Unlock

1
func (m *Mutex) Unlock()

Unlock方法解锁m,如果m未加锁会导致运行时错误。锁和线程无关,可以由不同的线程加锁和解锁。

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package main

import (
"fmt"
"sync"
)

type SafeInt struct {
sync.Mutex
Num int
}

func main() {
waitNum := 10 // 设置等待的个数(继续往下看)

count := SafeInt{}

done := make(chan bool)

for i := 0; i < waitNum; i++ {
go func(i int) {
fmt.Print(i)
count.Lock() // 加锁,防止其它例程修改 count
count.Num = count.Num + i
fmt.Print(" ",count.Num, " \n")
count.Unlock()

done <- true
}(i)
}

for i := 0; i < waitNum; i++ {
<-done
}
}

输出结果:
9 9
0 9
2 11
7 18
8 26
4 30
3 33
6 39
5 44
1 45

多次输出结果不一致,但和是45

读写互斥锁

type RWMutex

1
2
3
type RWMutex struct {
// 包含隐藏或非导出字段
}

RWMutex是读写互斥锁。该锁可以被同时多个读取者持有或唯一个写入者持有。RWMutex可以创建为其他结构体的字段;零值为解锁状态。RWMutex类型的锁也和线程无关,可以由不同的线程加读取锁/写入和解读取锁/写入锁。

func (*RWMutex) Lock

1
func (rw *RWMutex) Lock()

Lock方法将rw锁定为写入状态,禁止其他线程读取或者写入。

func (*RWMutex) Unlock

1
func (rw *RWMutex) Unlock()

Unlock方法解除rw的写入锁状态,如果m未加写入锁会导致运行时错误。

func (*RWMutex) RLock

1
func (rw *RWMutex) RLock()

RLock方法将rw锁定为读取状态,禁止其他线程写入,但不禁止读取。

func (*RWMutex) RUnlock

1
func (rw *RWMutex) RUnlock()

Runlock方法解除rw的读取锁状态,如果m未加读取锁会导致运行时错误。

func (*RWMutex) RLocker

1
func (rw *RWMutex) RLocker() Locker

Rlocker方法返回一个互斥锁,通过调用rw.Rlock和rw.Runlock实现了Locker接口。

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package main

import (
"fmt"
"sync"
"time"
)

var m *sync.RWMutex
var wg sync.WaitGroup

func main() {
m = new(sync.RWMutex)
wg.Add(2)
go write(1)
time.Sleep(1 * time.Second)
go read(2)
wg.Wait()
}
func write(i int) {
fmt.Println(i, "写开始.")
m.Lock()
fmt.Println(i, "正在写入中......")
time.Sleep(3 * time.Second)
m.Unlock()
fmt.Println(i, "写入结束.")
wg.Done()
}
func read(i int) {
fmt.Println(i, "读开始.")
m.RLock()
fmt.Println(i, "正在读取中......")
time.Sleep(1 * time.Second)
m.RUnlock()
fmt.Println(i, "读取结束.")
wg.Done()
}

并发等待

type WaitGroup

1
2
3
4
5
6
7
8
9
10
11
12
type WaitGroup struct {
// 包含隐藏或非导出字段
}

// 计数器增加 delta,delta 可以是负数。
func (wg *WaitGroup) Add(delta int)

// 计数器减少 1
func (wg *WaitGroup) Done()

// 等待直到计数器归零。如果计数器小于 0,则该操作会引发 panic。
func (wg *WaitGroup) Wait()

WaitGroup用于等待一组线程的结束。父线程调用Add方法来设定应等待的线程的数量。每个被等待的线程在结束时应调用Done方法。同时,主线程里可以调用Wait方法阻塞至所有线程结束。

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
var wg sync.WaitGroup
var urls = []string{
"http://www.golang.org/",
"http://www.google.com/",
"http://www.somestupidname.com/",
}
for _, url := range urls {
// Increment the WaitGroup counter.
wg.Add(1)
// Launch a goroutine to fetch the URL.
go func(url string) {
// Decrement the counter when the goroutine completes.
defer wg.Done()
// Fetch the URL.
http.Get(url)
}(url)
}
// Wait for all HTTP fetches to complete.
wg.Wait()

func (*WaitGroup) Add

1
func (wg *WaitGroup) Add(delta int)

Add方法向内部计数加上delta,delta可以是负数;如果内部计数器变为0,Wait方法阻塞等待的所有线程都会释放,如果计数器小于0,方法panic。注意Add加上正数的调用应在Wait之前,否则Wait可能只会等待很少的线程。一般来说本方法应在创建新的线程或者其他应等待的事件之前调用。

func (*WaitGroup) Done

1
func (wg *WaitGroup) Done()

Done方法减少WaitGroup计数器的值,应在线程的最后执行。

func (*WaitGroup) Wait

1
func (wg *WaitGroup) Wait()

Wait方法阻塞直到WaitGroup计数器减为0。

Cond

sync.Cond用于goroutine之间的协作,用于协程的挂起和唤醒。

1
2
3
4
5
6
7
8
type Cond struct {
noCopy noCopy // noCopy可以嵌入到结构中,在第一次使用后不可复制,使用go vet作为检测使用

L Locker // 根据需求初始化不同的锁,如*Mutex 和 *RWMutex

notify notifyList // 通知列表,调用Wait()方法的goroutine会被放入list中,每次唤醒,从这里取出
checker copyChecker // 复制检查,检查cond实例是否被复制
}

func NewCond

1
func NewCond(l Locker) *Cond

使用锁l创建一个*Cond。

func (*Cond) Broadcast

1
2
3
4
5
6
func (c *Cond) Broadcast() {
// 检查c是否是被复制的,如果是就panic
c.checker.check()
// 唤醒等待队列中所有的goroutine
runtime_notifyListNotifyAll(&c.notify)
}

Broadcast唤醒所有等待c的线程。调用者在调用本方法时,建议(但并非必须)保持c.L的锁定。

功能:唤醒等待队列中的所有goroutine

func (*Cond) Signal

1
2
3
4
5
6
func (c *Cond) Signal() {
// 检查c是否是被复制的,如果是就panic
c.checker.check()
// 通知等待列表中的一个
runtime_notifyListNotifyOne(&c.notify)
}

Signal唤醒等待c的一个线程(如果存在)。调用者在调用本方法时,建议(但并非必须)保持c.L的锁定。

功能:唤醒等待队列中的一个goroutine,一般都是任意唤醒队列中的一个goroutine

func (*Cond) Wait

1
2
3
4
5
6
7
8
9
10
11
func (c *Cond) Wait() {
// 检查c是否是被复制的,如果是就panic
c.checker.check()
// 将当前goroutine加入等待队列
t := runtime_notifyListAdd(&c.notify)
// 解锁
c.L.Unlock()
// 等待队列中的所有的goroutine执行等待唤醒操作
runtime_notifyListWait(&c.notify, t)
c.L.Lock()
}

Wait自行解锁c.L并阻塞当前线程,在之后线程恢复执行时,Wait方法会在返回前锁定c.L。和其他系统不同,Wait除非被Broadcast或者Signal唤醒,不会主动返回。

因为线程中Wait方法是第一个恢复执行的,而此时c.L未加锁。调用者不应假设Wait恢复时条件已满足,相反,调用者应在循环中等待:

1
2
3
4
5
6
c.L.Lock()
for !condition() {
c.Wait()
}
... make use of condition ...
c.L.Unlock()

功能: 必须获取该锁之后才能调用Wait()方法,Wait方法在调用时会释放底层锁Locker,并且将当前goroutine挂起,直到另一个goroutine执行Signal或者Broadcase,该goroutine才有机会重新唤醒,并尝试获取Locker,完成后续逻辑。

也就是在等待被唤醒的过程中是不占用锁Locker的,这样就可以有多个goroutine可以同时处于Wait(等待被唤醒的状态)

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package main

import (
"sync"
"fmt"
"time"
)

func main() {
locker := new(sync.Mutex)
cond := sync.NewCond(locker)

for i := 0 ; i < 30 ; i++ {
go func(x int) {
cond.L.Lock()
fmt.Println(x," 获取锁")
defer cond.L.Unlock()
cond.Wait()
fmt.Println(x," 被唤醒")
time.Sleep(time.Second)
}(i)
}


time.Sleep(time.Second)
fmt.Println("Signal...")
cond.Signal()
time.Sleep(time.Second)
cond.Signal()
time.Sleep(time.Second*3)
cond.Broadcast()

fmt.Println("Broadcast...")
time.Sleep(time.Minute)
}

Pool

Golang中sync.Pool用来提高对象复用几率,减少gc的压力,减少内存分配,它是线程安全的,常用来存储并复用临时对象。

1
2
3
4
5
6
7
8
type Pool struct {
noCopy noCopy //该对象不能被copy使用

local unsafe.Pointer // [p]poolLocal,固定长度
localSize uintptr //本地缓冲池poolLocal的数量

New func() interface{} //用户自定义的用于生成对象的方法
}

Pool是一个可以分别存取的临时对象的集合。

Pool中保存的任何item都可能随时不做通告的释放掉。如果Pool持有该对象的唯一引用,这个item就可能被回收。

Pool可以安全的被多个线程同时使用。

Pool的目的是缓存申请但未使用的item用于之后的重用,以减轻GC的压力。也就是说,让创建高效而线程安全的空闲列表更容易。但Pool并不适用于所有空闲列表。

Pool的合理用法是用于管理一组静静的被多个独立并发线程共享并可能重用的临时item。Pool提供了让多个线程分摊内存申请消耗的方法。

Pool的一个好例子在fmt包里。该Pool维护一个动态大小的临时输出缓存仓库。该仓库会在过载(许多线程活跃的打印时)增大,在沉寂时缩小。

另一方面,管理着短寿命对象的空闲列表不适合使用Pool,因为这种情况下内存申请消耗不能很好的分配。这时应该由这些对象自己实现空闲列表。

func (*Pool) Get

作用:从Pool中获取一个对象,如果获取不到并且New函数不为空,则通过New创建一个对象并返回。否则返回nil

1
func (p *Pool) Get() interface{}

Get方法从池中选择任意一个item,删除其在池中的引用计数,并提供给调用者。Get方法也可能选择无视内存池,将其当作空的。调用者不应认为Get的返回这和传递给Put的值之间有任何关系。

假使Get方法没有取得item:如p.New非nil,Get返回调用p.New的结果;否则返回nil。

func (*Pool) Put

1
func (p *Pool) Put(x interface{})

Put方法将x放入池中。