golang 没有简单普遍的方式来检查channel是否已经关闭了 关闭已经关闭的channel会导致panic 发送值到已经关闭的channel会导致panic 一个channel 关闭的原则是不要从接收端关闭channel,也不要关闭有多个并发发送者的channel。【别人可能还写呢】
换句话说,如果sender(发送者)只是唯一的sender或者是channel最后一个活跃的sender,那么你应该在sender的goroutine关闭channel,从而通知receivers(接收者们)已经没有值可以读了。
1. 不建议方案 1.1 recover 兜底 如果你因为某种原因从接收端(receiver side)关闭channel或者在多个发送者中的一个关闭channel,那么你应该使用列在Golang panic/recover Use Cases 的函数来安全地发送值到channel中(假设channel的元素类型是T)
1 2 3 4 5 6 7 8 9 10 11 12 func SafeSend (ch chan T, value T) (closed bool ) { defer func () { if recover () != nil { closed = true } }() ch <- value return false }
同样的想法也可以用在从多个goroutine关闭channel中:
1 2 3 4 5 6 7 8 9 10 11 func SafeClose (ch chan T) (justClosed bool ) { defer func () { if recover () != nil { justClosed = false } }() close (ch) return true }
1.2 sync.Once 很多人喜欢用sync.Once
来关闭channel:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 type MyChannel struct { C chan T once sync.Once } func NewMyChannel () *MyChannel { return &MyChannel{C: make (chan T)} } func (mc *MyChannel) SafeClose() { mc.once.Do(func () { close (mc.C) }) }
要知道golang的设计者不提供SafeClose或者SafeSend方法是有原因的,他们本来就不推荐在消费端或者在并发的多个生产端关闭channel,比如关闭只读channel在语法上就彻底被禁止使用了。
2. 优雅的关闭Channel 2.1 单个生产者,多个消费者 这种情况最简单,直接让生产者关闭channel好了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 package mainimport ( "time" "math/rand" "sync" "log" ) func main () { rand.Seed(time.Now().UnixNano()) log.SetFlags(0 ) const MaxRandomNumber = 100000 const NumReceivers = 100 wgReceivers := sync.WaitGroup{} wgReceivers.Add(NumReceivers) dataCh := make (chan int , 100 ) go func () { for { if value := rand.Intn(MaxRandomNumber); value == 0 { close (dataCh) return } else { dataCh <- value } } }() for i := 0 ; i < NumReceivers; i++ { go func () { defer wgReceivers.Done() for value := range dataCh { log.Println(value) } }() } wgReceivers.Wait() }
2.2 多个生产者,单个消费者 这种情况要比上面的复杂一点。我们不能在消费端关闭channel,因为这违背了channel关闭原则。
我们可以让消费端关闭一个附加的信号来通知发送端停止生产数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 package mainimport ( "time" "math/rand" "sync" "log" ) func main () { rand.Seed(time.Now().UnixNano()) log.SetFlags(0 ) const MaxRandomNumber = 100000 const NumSenders = 1000 wgReceivers := sync.WaitGroup{} wgReceivers.Add(1 ) dataCh := make (chan int , 100 ) stopCh := make (chan struct {}) for i := 0 ; i < NumSenders; i++ { go func () { for { select { case <- stopCh: return default : } select { case <- stopCh: return case dataCh <- rand.Intn(MaxRandomNumber): } } }() } go func () { defer wgReceivers.Done() for value := range dataCh { if value == MaxRandomNumber-1 { close (stopCh) return } log.Println(value) } }() wgReceivers.Wait() }
就上面这个例子,生产者同时也是退出信号channel的接受者。退出信号channel仍然是由它的生产端关闭的,所以这仍然没有违背channel关闭原则。
值得注意的是,这个例子中生产端和接受端都没有关闭消息数据的channel,channel在没有任何goroutine引用的时候会自行关闭,而不需要显示进行关闭。
2.3 多个生产者,多个消费者 这是最复杂的一种情况,我们既不能让接受端也不能让发送端关闭channel。我们甚至都不能让接受者关闭一个退出信号来通知生产者停止生产。因为我们不能违反channel关闭原则。
但是我们可以引入一个额外的协调者(主持人),来关闭附加的退出信号channel。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 package mainimport ( "time" "math/rand" "sync" "log" "strconv" ) func main () { rand.Seed(time.Now().UnixNano()) log.SetFlags(0 ) const MaxRandomNumber = 100000 const NumReceivers = 10 const NumSenders = 1000 wgReceivers := sync.WaitGroup{} wgReceivers.Add(NumReceivers) dataCh := make (chan int , 100 ) stopCh := make (chan struct {}) toStop := make (chan string , 1 ) var stoppedBy string go func () { stoppedBy = <- toStop close (stopCh) }() for i := 0 ; i < NumSenders; i++ { go func (id string ) { for { value := rand.Intn(MaxRandomNumber) if value == 0 { select { case toStop <- "sender#" + id: default : } return } select { case <- stopCh: return default : } select { case <- stopCh: return case dataCh <- value: } } }(strconv.Itoa(i)) } for i := 0 ; i < NumReceivers; i++ { go func (id string ) { defer wgReceivers.Done() for { select { case <- stopCh: return default : } select { case <- stopCh: return case value := <-dataCh: if value == MaxRandomNumber-1 { select { case toStop <- "receiver#" + id: default : } return } log.Println(value) } } }(strconv.Itoa(i)) } wgReceivers.Wait() log.Println("stopped by" , stoppedBy) }
在这个例子中,仍然遵守着channel closing principle。 请注意channel toStop
的缓冲大小是1.这是为了避免当主持人 goroutine 准备好之前第一个通知就已经发送了,导致丢失。
楼主你好, 关于第三个例子有些问题请教。参考:https://www.jianshu.com/p/d24dfbb33781
value==0时, 为什么还要加个select, 不能直接发送给toStop吗? 1 2 3 4 5 6 7 8 if value == 0 { select { case toStop <- "sender#" + id: default : } return }
因为可能多个生产者或者多个消费者满足条件, 防止阻塞。(命中了default可以往下走)
select stopCh 为什么写了两次? 第一个select可以省略吗? 1 2 3 4 5 6 7 8 9 10 11 select {case <- stopCh: return default :} select {case <- stopCh: return case dataCh <- value:}
为了尽早退出, 因为第二个 select有可能 select 到dataCh, 虽然已经通知关闭了
toStop的缓冲大小是1, 为了避免准备好之前通知就发送了怎么理解??
请注意channel toStop的缓冲大小是1.这是为了避免当主持人 goroutine 准备好之前第一个通知就已经发送了,导致丢失。
因为有缓冲的 发送 happens_before 接收之前, 所以主持人能保证接收到数据
无缓冲的 接收 happens_before 发送之间, 可能会丢失数据
3. 参考资料