Golang | 上一篇纠错

2020 05 24, Sun

updated on July, 19th, 2020

上一篇我以为自己很聪明的做了benchmark,判断了用mutex更快,觉得自己很牛逼。但是实际上是我的解决方案有问题,channel用错了方法。

今天改了解决方案,让channel传输结果就好了。

先说正确结论

用不用channel,什么时候用channel

Channel和Mutex的主要区别不在于实现方法,而是编码逻辑,实际上channel内部还是用了锁,所以讨论channel比mutex好多少是错误的。

我们应该尽可能多的把计算中的读写集中在一个线程中,最后只是汇总结果。

这有两个好处,一个是可以尽可能少的访问锁,另一个是CPU缓存击中的可能性更高,两个都可以提高性能。

解决方案的更新

我之前一直在想,为什么Exercism已经提供了一个Frequency的实现。今天才想清楚,因为我们只需要考虑并行的部分。

为什么要这么做

其实前面已经提到了,主要是考虑各种操作的耗时。总的来说不加锁比加锁快,加锁比加了锁还要等待快。

那么回过头来看,我之前的Channel解法有什么问题呢?

// ConcurrentFrequencyChan calculate the frequency concurrently(wrong version)
func ConcurrentFrequencyChan(strings []string) FreqMap {
	m := FreqMap{}
	ch := make(chan rune)

	// use wg to wait for concurrent tasks
	wg := &sync.WaitGroup{}
	writerWg := &sync.WaitGroup{}

	writerWg.Add(1)
	wg.Add(len(strings))
	for _, str := range strings {
		go func(s string) {
			for _, r := range s {

				ch <- r
			}
			wg.Done()
		}(str)
	}
	go func() {
		for r := range ch {
			m[r]++
		}
		writerWg.Done()
	}()
    wg.Wait()
    // close the channel to prevent memory leak
	close(ch)

	writerWg.Wait()

	return m
}

看起来我们利用了chan,把步骤拆分开了,绕开了锁,理论上应该更快才对。

但是实际上我们在推对象进channel的时候,就已经加了锁。在go1.14.2源码中,/src/runtime/chan.go里面有channel具体的实现,类型为runtime.hchan

type hchan struct {
    qcount   uint
    dataqsiz uint
    buf      unsafe.Pointer
    ...
    recvq    waitq
    sendq    waitq

    lock     mutex
}

我们可以看到除了buffer以外,实际上channel维护了两个队列,有一个锁。

runtime.chansend函数中,我们可以看到实际上在发送时,我们就会持有channel的锁,完成发送以后释放,如果阻塞、协程切出了,这个锁还会被用作协程的锁。

删过的代码如下:


func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    // ...

    lock(&c.lock)

    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("send on closed channel"))
    }

    if sg := c.recvq.dequeue(); sg != nil {
        // Found a waiting receiver. We pass the value we want to send
        // directly to the receiver, bypassing the channel buffer (if any).
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
    }

    if c.qcount < c.dataqsiz {
        // Space is available in the channel buffer. Enqueue the element to send.
        qp := chanbuf(c, c.sendx)
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
        }
        typedmemmove(c.elemtype, qp, ep)
        c.sendx++
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        c.qcount++
        unlock(&c.lock)
        return true
    }

    if !block {
        unlock(&c.lock)
        return false
    }

    // Block on the channel. Some receiver will complete our operation for us.

    //...

    // parking the coroutine

    c.sendq.enqueue(mysg)
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
    // Ensure the value being sent is kept alive until the
    // receiver copies it out. The sudog has a pointer to the
    // stack object, but sudogs aren't considered as roots of the
    // stack tracer.
    KeepAlive(ep)

    // resume. the lock is release

    // someone woke us up.
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    gp.activeStackChans = false
    if gp.param == nil {
        if c.closed == 0 {
            throw("chansend: spurious wakeup")
        }
        panic(plainError("send on closed channel"))
    }
    gp.param = nil
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    mysg.c = nil
    releaseSudog(mysg)
    return true
}

也就相当于我们实际上还是非常频繁的使用了mutex。让这个函数更慢的是协程调度中的上下文切换,goroutine一直在锁channel等待有人写入从而恢复执行,这个调度开支是非常大的。

正确的解法是:

func ConcurrentFrequency(strings []string) FreqMap {
	m := FreqMap{}
	ch := make(chan FreqMap)

	// use wg to wait for concurrent tasks
	for idx := 0; idx < len(strings); idx++ {
		go func(i int) {
			ch <- Frequency(strings[i])
		}(idx)
	}

	for idx := 0; idx < len(strings); idx++ {
		for k, v := range <-ch {
			m[k] += v
		}
	}
	// close the channel to prevent memory leak
	close(ch)

	return m
}

应该让调用频繁的地方lock free

Send cerebra only. 只送大脑。 - Thomas Wade(托马斯·维德)