下次想在Golang中写个并发处理,就用这个模板,准没错!

引言

要说 Golang 中最引以为傲的特性,那非 goroutine 莫属,goroutine(协程)很轻量,相比于每个线程要使用 1MB 的内存,每个 go 协程只需要 1kb 左右就够了。

于是,在需要做并发处理的时候,很自然的就想着,go 一下就好了吗? 示例代码如下

	for i:=0; i < 5; i++ {
		go func(index int) {
			fmt.Println(index)
		}(i) //这里为什么要把i传进来呢?
	}

这样可以并发处理请求了是不假,但如果其中一个请求出错了,需要退出怎么办了? 一方面,可以自己实现这个错误处理(稍后会写),另一方面,也可以直接用 golang 官方errgroup

errorgoup 是个好东东

上面的示例代码,如果用 errgroup 来重新实现,会是下面这个样子

	g, _ := errgroup.WithContext(context.Background())

	for i := 0; i < 5; i++ {
	    index := i
		g.Go(func() error {
		    fmt.Println(index)
		    return nil // 如果想Mock一些错误,也可以return一个error
		})
	}

	if err = g.Wait(); err != nil {
		return err
	}

是不是还挺简单的?感兴趣的,可以自行搜下源码,除去注释只有大概 30 行代码,还是很好理解的。

现在错误处理也有了,是不是就完美了呢?

这个问题就要看有请求要并发处理了,协程虽然很轻量,但也还是要耗费一些资源的,如果可以预见到有几百上千的请求的要处理,那就需要协程池来复用协程,达到节省资源的目的了。

网上有很多协程池的实现,大都做的大而全,考虑了很多场景,但实际编码场景中,很可能只是为了解决一个小问题,就引入一个包,实在觉得有些太重了呢,而且可能还不够灵活。

有没有一个简单的模板,可以 copy/paste/tweak 一下呢?这就来了

一个简单实用的模板

闲话少絮,直接上代码先,关键部分会在代码中加注释解释。

	var (
		err         error
		outputs     []int
		workers     = 4 //协程的数量,可以按需设置,一般不大于runtime.NumCPU()
		workChannel = make(chan int)
		errChannel  = make(chan error, workers)
		wg          = &sync.WaitGroup{}
		mux         = sync.Mutex{}
	)

	worker := func(input int) (int, error){
	    retrun input, nil //如果想Mock一些错误,也可以return一个error
	}

	wg.Add(workers)
	for i := 0; i < workers; i++ {
		go func() {
			defer wg.Done()
			for input := range workChannel { // workChannel被close时,这个循环就会退出
				output, err:=worker(input)
				if err != nil {
					errChannel <- err
					break
				}

				mux.Lock() //使用lock保护outputs,来搜集执行结果,如果不需要可以删除
				outputs = append(outputs, output)
				mux.Unlock()
			}
		}()
	}

loop:
	for _, input := range inputs {
		select {
		case workChannel <- input:
		case err = <-errChannel:
			break loop
		}
	}

	close(workChannel) //关闭workChannel,可以让工作协程,在处理完当前任务后退出
	wg.Wait()

	// 关于select case,如果有多个case满足时,会选择随机进入一个case的,所以需要再检查一次,双重保险
	if err == nil {
		select {
		case err = <-errChannel:
		default:
		}
	}

	return outputs, err

代码看着是多了些,但在实际使用过程中,按需要改下worker函数输入和输出的类型即可。

如果 copy/paste 也不想做,那就只能封装一下了,但是因为现在 golang 还没正式推出范型,只能用inteface{}了,看着是不大好看,使用时,也要自己转来转去的,不过可以凑合着用啦。

话说,封装好的代码在这 parallel_runner.go,需要的自取了。

关于 context

也许有人会问,为什么不用context.WithCancel(),然后在出现错误的时候 cancel 一下? 讲究一点的话,确实应该用,但那也意味在在worker函数中,你也要检查ctx.Done(), 我不用还是因为懒了……

不过,我在封装后的代码parallel_runner.go里,是有加入 context 啦,看着就是更复杂了些……

写在最后

之前还写过一些关于 channel 的使用的文章,

里面实现的轻量级 util 都开源在channelx,欢迎大家审阅,如果有你喜欢用的工具,欢迎点个赞或者 star :)