并发问题

概述

常见并发报错:

  • fatal error:concurrent map read and map write

Go 并发操作变量

常见错误使用 goroutine

func goroutineRun(values []int)  {
    for value := range values {
        go value.test()
    }
}
func goroutineRun(values []int)  {
    for value := range values {
        go func() {
            fmt.Println(value)
        }()
    }
}

这两段代码实际上是遍历数组的所有变量。由于闭包只是绑定到这个 value 变量上,并没有被保存到 goroutine 栈中,所以以上代码极有可能运行的结构都输出为切片的最后一个元素。因为这样写会导致 for 循环结束后才执行 goroutine 多线程操作,这时候 v alue 值只指向了最后一个元素。这样的结果不是我们所希望的,而且还会产生并发的资源抢占冲突所以是非常不推荐这样写的。

解决方法

for val := range values {
    go func(val interface{}) {
        fmt.Println(val)
    }(val)
}

在这里将 val 作为一个参数传入 goroutine 中,每个 val 都会被独立计算并保存到 goroutine 的栈中,从而得到预期的结果。 另一种方法是在循环内定义新的变量,由于在循环内定义的变量在循环遍历的过程中是不共享的,因此也可以达到同样的效果:

for i := range valslice {
    val := valslice[i]
    go func() {
        fmt.Println(val)
    }()
}

Go 中并发读写问题及解决方法

map 并发读写问题

如果 map 由多协程同时读和写就会出现 fatal error:concurrent map read and map write 相关的错误

如下代码很容易就出现 map 并发读写问题,报错:fatal error: concurrent map read and map write

func main() {
	c := make(map[string]int)
	go func() { //开一个协程写map
		for j := 0; j < 1000000; j++ {
			c[fmt.Sprintf("%d", j)] = j
		}
	}()
	go func() { //开一个协程读map
		for j := 0; j < 1000000; j++ {
			fmt.Println(c[fmt.Sprintf("%d", j)])
		}
	}()
	time.Sleep(time.Second * 20)
}

如下代码很容易出现 map 并发写的问题,报错:fatal error: concurrent map writes

func main() {
	c := make(map[string]int)
	for i := 0; i < 100; i++ {
		go func() { //开100个协程并发写map
			for j := 0; j < 1000000; j++ {
				c[fmt.Sprintf("%d", j)] = j
			}
		}()
	}
	time.Sleep(time.Second * 20) //让执行main函数的主协成等待20s,不然不会执行上面的并发操作
}

出现问题的原因

因为 map 为引用类型,所以即使函数传值调用,参数副本依然指向映射 m, 所以多个 goroutine 并发写同一个映射 m, 写过多线程程序的同学都知道,对于共享变量,资源,并发读写会产生竞争的, 故共享资源遭到破坏

解决方法

加锁

(1)通用锁

type Demo struct {
	Data map[string]string
	Lock sync.Mutex
}

func (d Demo) Get(k string) string {
	d.Lock.Lock()
	defer d.Lock.UnLock()
	return d.Data[k]
}

func (d Demo) Set(k, v string) {
	d.Lock.Lock()
	defer d.Lock.UnLock()
	d.Data[k] = v
}

(2)读写锁

type Demo struct {
	Data map[string]string
	Lock sync.RwMutex
}

func (d Demo) Get(k string) string {
	d.Lock.RLock()
	defer d.Lock.RUnlock()
	return d.Data[k]
}

func (d Demo) Set(k, v string) {
	d.Lock.Lock()
	defer d.Lock.UnLock()
	d.Data[k] = v
}

利用 channel 串行化处理

Golang 并发操作变量需要注意的问题

参考:

要记得 golang 中变量的赋值不是并发安全的

什么是并发安全

我理解的并发安全就是当并发和不并发的情况下执行结果是一致的.

比如 count++, ok = !ok, 在非并发下很好理解, 而在并发情况下却容易出现预期之外的结果, 这样的代码就是非并发安全的.举个例子:

count++ 其实是分成两步执行的. 当分成了两步执行, 那么其他协程就可以趁着这个时间间隙作怪.如一下 ab 两个协程同时 count++

count:= 1
a > 读取count : 1
b > 读取count : 1
a > 计算count+1 : 2
b > 计算count+1 : 2
a > 赋值count : 2
b > 赋值count : 2

这就会发生明明 ab 协程计算了两次, 可结果还是 2.

赋值一个简单的 count 都会出现偏差, 那么赋值一个更为复杂的结构体会不会有问题呢?

例如以下代码, 会进入 x.Y != x.X 判断分支(概率低 但总会发生). 如果还有其他协程再去读 x 变量, 则会引发逻辑错误.

func TestX(t *testing.T) {
    x := struct {
        X string
        Y string
    }{}

    for i := 0; i < 300000; i++ {
        go func() {
            y := strconv.FormatInt(int64(i), 10)
            x = struct {
                X string
                Y string
            }{
                X: y,
                Y: y,
            }
            if x.Y != x.X {
                t.Log("-----", x)
            }
        }()
    }

    time.Sleep(1 * time.Second)

    t.Log(x)
}

可以想到,在结构体中有多个字段, a 协程赋值了一些字段(x 字段), b 协程赋值了一些字段(y 字段), 此时的整个结构体既不是 a 协程想要的数据, 也不是 b 协程想要的数据.

如何解决这个问题呢: 使用 atomic.Value

func TestY(t *testing.T) {
    v := atomic.Value{}
    for i := 0; i < 300000; i++ {
        go func() {
            y := strconv.FormatInt(int64(i), 10)

            v.Store(struct {
                X string
                Y string
            }{
                X: y,
                Y: y,
            })

            x := v.Load().(struct {
                X string
                Y string
            })
            if x.Y != x.X {
                t.Log("-----", x)
            }
        }()
    }

    time.Sleep(1 * time.Second)

    t.Log(v.Load())
}

这时候就不会进入 x.Y != x.X 分支了.

loop variable bucket captured by func literal

https://www.codenong.com/40326723/

go vet range variable captured by func literal when using go routine inside of for each loop

我不太确定" funcliteral"是什么,因此这个错误使我有些困惑。 我想我已经看到了问题-我是从新的 go 例程中引用一个范围值变量,因此该值可能随时更改,而不是我们期望的值。 解决问题的最佳方法是什么? 有问题的代码:

func (l *Loader) StartAsynchronous() []LoaderProcess {
    for _, currentProcess := range l.processes {
        cmd := exec.Command(currentProcess.Command, currentProcess.Arguments...)
        log.LogMessage("Asynchronously executing LoaderProcess: %+v", currentProcess)
        go func() {
            output, err := cmd.CombinedOutput()
            if err != nil {
                log.LogMessage("LoaderProcess exited with error status: %+v\
 %v", currentProcess, err.Error())
            } else {
                log.LogMessage("LoaderProcess exited successfully: %+v", currentProcess)
                currentProcess.Log.LogMessage(string(output))
            }
            time.Sleep(time.Second * TIME_BETWEEN_SUCCESSIVE_ITERATIONS)
        }()
    }
    return l.processes
}

我建议的修正:

func (l *Loader) StartAsynchronous() []LoaderProcess {
    for _, currentProcess := range l.processes {
        cmd := exec.Command(currentProcess.Command, currentProcess.Arguments...)
        log.LogMessage("Asynchronously executing LoaderProcess: %+v", currentProcess)
        localProcess := currentProcess
        go func() {
            output, err := cmd.CombinedOutput()
            if err != nil {
                log.LogMessage("LoaderProcess exited with error status: %+v\
 %v", localProcess, err.Error())
            } else {
                log.LogMessage("LoaderProcess exited successfully: %+v", localProcess)
                localProcess.Log.LogMessage(string(output))
            }
            time.Sleep(time.Second * TIME_BETWEEN_SUCCESSIVE_ITERATIONS)
        }()
    }
    return l.processes
}

但这真的可以解决问题吗? 我刚刚将引用从范围变量移到了一个不同的局部变量,其值基于我所参与的每个循环的迭代。


别担心,这是 Go 中新手经常遇到的错误,是的,每个循环都会改变 var currentProcess,因此您的 goroutine 将使用 slice l.processes 中的最后一个进程,您要做的就是将变量传递为匿名函数的参数,如下所示:

func (l *Loader) StartAsynchronous() []LoaderProcess {

    for ix := range l.processes {

        go func(currentProcess *LoaderProcess) {

            cmd := exec.Command(currentProcess.Command, currentProcess.Arguments...)
            log.LogMessage("Asynchronously executing LoaderProcess: %+v", currentProcess)

            output, err := cmd.CombinedOutput()
            if err != nil {
                log.LogMessage("LoaderProcess exited with error status: %+v\
 %v", currentProcess, err.Error())
            } else {
                log.LogMessage("LoaderProcess exited successfully: %+v", currentProcess)
                currentProcess.Log.LogMessage(string(output))
            }

            time.Sleep(time.Second * TIME_BETWEEN_SUCCESSIVE_ITERATIONS)

        }(&l.processes[ix]) // passing the current process using index

    }

    return l.processes
}

对于那些寻找简单示例的人: 这是错误的:

func main() {
  for i:=0; i<10; i++{
    go func(){
        processValue(i)
    }()
  }
}

func processValue(i int){
  fmt.Println(i)
}

这不完全是一个错误,但可能会导致意外行为,因为控制循环的变量 i 可能会从其他 go 例程更改。实际上是 vet 命令,它会对此发出警报。 Go vet 可以精确地帮助您找到这种可疑的结构,它使用的启发式方法不能保证所有报告都是真正的问题,但是可以找到编译器未捕获的错误。因此,不时运行它是一个好习惯。 在运行代码之前,Go Playground 运行 go vet,您可以在此处看到实际操作。 这是对的:

func main() {
  for i:=0; i<10; i++{
    go func(differentI int){
        processValue(differentI)
    }(i)
  }
}

func processValue(i int){
  fmt.Println(i)
}

我故意将函数文字参数命名为 differentI,以使其明显是另一个变量。以这种方式进行并发使用是安全的,兽医不会抱怨,您不会有任何奇怪的行为。您可以在这里看到实际效果。 (由于在不同的 go 例程上完成了打印,因此您将看不到任何内容,但是程序将成功退出) 顺便说一句,func 文字基本上是一个匿名函数:)


是的,您所做的是正确解决此警告的最简单方法。 修复之前,只有一个变量,所有 goroutine 都在引用它。这意味着他们从开始时看不到值,而是当前值。在大多数情况下,这是该范围内的最后一个。

结语

在编写并发代码的时候一定记得处理这些问题(即便只是一个基本类型的变量): 可以通过加锁, 或者不使用并发读写的方式(使用 channel 或者 函数式编程).

主要是在使用 go 协程实现并发时,尤其注意上面最后两个简单的示例,尽量不要在协程中直接使用循环中的变量,而是将变量作为开启协程的匿名函数的参数,传递进来。

golang 中并发读写同一个变量会出现部分错乱吗?

参考:原文链接

譬如一个 int64 两个协程并发修改

那么会出现一个协程 A 修改了前面 32 位,
然后切换到另一个协程 B 把 64 位都修改了,
接着协程 A 继续修改后 32 位....
最终的结果是 B 决定了前面 32 位,A 决定了后面 32 位...数值上既不是 A 的结果,也不是 B 的结果

嗯, golang 里面的协程是自愿让渡的模式, 但是协程依赖的线程,依旧是操作系统的抢占是模式

也就是说有可能出现,一个协程修改某个值,修改到一半,被挂起, 所以是可能的,这样推论是否正确

int64 位应该没可能, 因为汇编里面有对应的 int64 类型的指令, 不过,如果是个 struct 结构…应该有可能出现这个问题

谢谢

控制并发数量

https://segmentfault.com/a/1190000017956396


最后修改 July 4, 2023: update (3b7c1d43)