
Golang | 上一篇纠错
updated on July, 19th, 2020
上一篇我以为自己很聪明的做了benchmark,判断了用mutex更快,觉得自己很牛逼。但是实际上是我的解决方案有问题,channel用错了方法。
今天改了解决方案,让channel传输结果就好了。
先说正确结论
用不用channel,什么时候用channel
Channel和Mutex的主要区别不在于实现方法,而是编码逻辑,实际上channel内部还是用了锁,所以讨论channel比mutex好多少是错误的。
我们应该尽可能多的把计算中的读写集中在一个线程中,最后只是汇总结果。
这有两个好处,一个是可以尽可能少的访问锁,另一个是CPU缓存击中的可能性更高,两个都可以提高性能。
解决方案的更新
我之前一直在想,为什么Exercism已经提供了一个Frequency的实现。今天才想清楚,因为我们只需要考虑并行的部分。
为什么要这么做
其实前面已经提到了,主要是考虑各种操作的耗时。总的来说不加锁比加锁快,加锁比加了锁还要等待快。
那么回过头来看,我之前的Channel解法有什么问题呢?
// ConcurrentFrequencyChan calculate the frequency concurrently(wrong version)
func ConcurrentFrequencyChan(strings []string) FreqMap {
m := FreqMap{}
ch := make(chan rune)
// use wg to wait for concurrent tasks
wg := &sync.WaitGroup{}
writerWg := &sync.WaitGroup{}
writerWg.Add(1)
wg.Add(len(strings))
for _, str := range strings {
go func(s string) {
for _, r := range s {
ch <- r
}
wg.Done()
}(str)
}
go func() {
for r := range ch {
m[r]++
}
writerWg.Done()
}()
wg.Wait()
// close the channel to prevent memory leak
close(ch)
writerWg.Wait()
return m
}
看起来我们利用了chan,把步骤拆分开了,绕开了锁,理论上应该更快才对。
但是实际上我们在推对象进channel的时候,就已经加了锁。在go1.14.2源码中,/src/runtime/chan.go
里面有channel具体的实现,类型为runtime.hchan
type hchan struct {
qcount uint
dataqsiz uint
buf unsafe.Pointer
...
recvq waitq
sendq waitq
lock mutex
}
我们可以看到除了buffer以外,实际上channel维护了两个队列,有一个锁。
在runtime.chansend
函数中,我们可以看到实际上在发送时,我们就会持有channel的锁,完成发送以后释放,如果阻塞、协程切出了,这个锁还会被用作协程的锁。
删过的代码如下:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// ...
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
if !block {
unlock(&c.lock)
return false
}
// Block on the channel. Some receiver will complete our operation for us.
//...
// parking the coroutine
c.sendq.enqueue(mysg)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
// Ensure the value being sent is kept alive until the
// receiver copies it out. The sudog has a pointer to the
// stack object, but sudogs aren't considered as roots of the
// stack tracer.
KeepAlive(ep)
// resume. the lock is release
// someone woke us up.
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
return true
}
也就相当于我们实际上还是非常频繁的使用了mutex。让这个函数更慢的是协程调度中的上下文切换,goroutine一直在锁channel等待有人写入从而恢复执行,这个调度开支是非常大的。
正确的解法是:
func ConcurrentFrequency(strings []string) FreqMap {
m := FreqMap{}
ch := make(chan FreqMap)
// use wg to wait for concurrent tasks
for idx := 0; idx < len(strings); idx++ {
go func(i int) {
ch <- Frequency(strings[i])
}(idx)
}
for idx := 0; idx < len(strings); idx++ {
for k, v := range <-ch {
m[k] += v
}
}
// close the channel to prevent memory leak
close(ch)
return m
}
应该让调用频繁的地方lock free
Send cerebra only. 只送大脑。 - Thomas Wade(托马斯·维德)