下次想在Golang中写个并发处理,就用这个模板,准没错!
引言
要说 Golang 中最引以为傲的特性,那非 goroutine 莫属,goroutine(协程)很轻量,相比于每个线程要使用 1MB 的内存,每个 go 协程只需要 1kb 左右就够了。
于是,在需要做并发处理的时候,很自然的就想着,go 一下就好了吗? 示例代码如下
for i:=0; i < 5; i++ {
go func(index int) {
fmt.Println(index)
}(i) //这里为什么要把i传进来呢?
}
这样可以并发处理请求了是不假,但如果其中一个请求出错了,需要退出怎么办了? 一方面,可以自己实现这个错误处理(稍后会写),另一方面,也可以直接用 golang 官方errgroup
errorgoup 是个好东东
上面的示例代码,如果用 errgroup 来重新实现,会是下面这个样子
g, _ := errgroup.WithContext(context.Background())
for i := 0; i < 5; i++ {
index := i
g.Go(func() error {
fmt.Println(index)
return nil // 如果想Mock一些错误,也可以return一个error
})
}
if err = g.Wait(); err != nil {
return err
}
是不是还挺简单的?感兴趣的,可以自行搜下源码,除去注释只有大概 30 行代码,还是很好理解的。
现在错误处理也有了,是不是就完美了呢?
这个问题就要看有请求要并发处理了,协程虽然很轻量,但也还是要耗费一些资源的,如果可以预见到有几百上千的请求的要处理,那就需要协程池来复用协程,达到节省资源的目的了。
网上有很多协程池的实现,大都做的大而全,考虑了很多场景,但实际编码场景中,很可能只是为了解决一个小问题,就引入一个包,实在觉得有些太重了呢,而且可能还不够灵活。
有没有一个简单的模板,可以 copy/paste/tweak 一下呢?这就来了
一个简单实用的模板
闲话少絮,直接上代码先,关键部分会在代码中加注释解释。
var (
err error
outputs []int
workers = 4 //协程的数量,可以按需设置,一般不大于runtime.NumCPU()
workChannel = make(chan int)
errChannel = make(chan error, workers)
wg = &sync.WaitGroup{}
mux = sync.Mutex{}
)
worker := func(input int) (int, error){
retrun input, nil //如果想Mock一些错误,也可以return一个error
}
wg.Add(workers)
for i := 0; i < workers; i++ {
go func() {
defer wg.Done()
for input := range workChannel { // workChannel被close时,这个循环就会退出
output, err:=worker(input)
if err != nil {
errChannel <- err
break
}
mux.Lock() //使用lock保护outputs,来搜集执行结果,如果不需要可以删除
outputs = append(outputs, output)
mux.Unlock()
}
}()
}
loop:
for _, input := range inputs {
select {
case workChannel <- input:
case err = <-errChannel:
break loop
}
}
close(workChannel) //关闭workChannel,可以让工作协程,在处理完当前任务后退出
wg.Wait()
// 关于select case,如果有多个case满足时,会选择随机进入一个case的,所以需要再检查一次,双重保险
if err == nil {
select {
case err = <-errChannel:
default:
}
}
return outputs, err
代码看着是多了些,但在实际使用过程中,按需要改下worker
函数输入和输出的类型即可。
如果 copy/paste 也不想做,那就只能封装一下了,但是因为现在 golang 还没正式推出范型,只能用inteface{}
了,看着是不大好看,使用时,也要自己转来转去的,不过可以凑合着用啦。
话说,封装好的代码在这 parallel_runner.go,需要的自取了。
关于 context
也许有人会问,为什么不用context.WithCancel()
,然后在出现错误的时候 cancel 一下?
讲究一点的话,确实应该用,但那也意味在在worker
函数中,你也要检查ctx.Done()
, 我不用还是因为懒了……
不过,我在封装后的代码parallel_runner.go里,是有加入 context 啦,看着就是更复杂了些……
写在最后
之前还写过一些关于 channel 的使用的文章,
- 如何把 Golang 的 channel 用的如 nodejs 的 stream 一样丝滑
- 如何用 Golang 的 channel 实现消息的批量处理
- 如何把 golang 的 Channel 玩出 async 和 await 的 feel
里面实现的轻量级 util 都开源在channelx,欢迎大家审阅,如果有你喜欢用的工具,欢迎点个赞或者 star :)