Go 并发

Submitted by Lizhe on Thu, 08/24/2017 - 15:41

Go语言最大的一个特点就是对并发的原生支持

Go的并发指的是能让某个函数独立于其他函数运行的能力.

当一个函数创建为goroutine时, Go会将其视为一个独立的工作单元. 这个单元会被调度到任何可用的逻辑处理上执行.

Go的运行时调度器能管理被创建的goroutine并为他们分配执行时间.

Go基于叫做CSP ( Communicating Sequential Processes ) 的并发同步模型来保证数据的线程安全, CSP是一种消息传递模型, 通过在goroutine之间传递数据来传递消息,而不是对数据进行加锁来来实现同步访问, 用于传递数据的关键数据类型叫做channel

package main

import (
    "fmt"
    "runtime"
    "sync"
)

func main(){
    runtime.GOMAXPROCS(1)
    
    var wg sync.WaitGroup
    wg.Add(2)
    
    fmt.Println("start")
    
    go func(){
        defer wg.Done()
        for i:=0;i<100;i++{
            fmt.Println("go1:",i)
        }
    }()
    
    go func(){
        defer wg.Done()
        for i:=0;i<100;i++{
            fmt.Println("go2:",i)
        }
    }()
    
    fmt.Println("waiting to finish")
    wg.Wait()
    
    fmt.Println("Done")
}
 

上面说到Go语言的运行时会在逻辑处理器上调度goroutine来运行, 每个逻辑处理器都分别绑定到单个操作系统线程

1.5 之前的版本只创建一个逻辑处理器, 1.5及其之后的版本会为每个可用的物理处理器各创建一个

如果创建一个goroutine并准备运行, 这个goroutine就会被放到调度器的全局运行队列中.

之后调度器就将这些队列中的goroutine分配给一个逻辑处理器,并放到这个逻辑处理器对应的本地运行队列中, 本地运行队列中的goroutine会一直等待直到自己被分配的逻辑处理器执行

470

如果正在运行的goroutine需要执行一个阻塞的系统调用,比如打开一个文件.

线程和goroutine会从逻辑处理器上分离, 该线程会继续阻塞,等待系统调用返回, 与此同时, 这个逻辑处理器就失去了原来的线程, 所以调度器会创建一个新线程并将其绑定到该逻辑处理器上

一旦阻塞结束, 对应的goroutine会放回到本地队列, 之前的线程会保存好,以便之后继续使用

如果是一个网络IO阻塞, 情况会有一些不同, goroutine会从逻辑处理器分离,并移动到集成了网络轮询器的运行时, 阻塞结束时goroutine会被重新分配到逻辑处理器上

 

471

实际上,一个goroutine并不相当于一个线程,goroutine的出现正是为了替代原来的线程概念成为最小的调度单位。一旦运行goroutine时,先去当先线程查找,如果线程阻塞了,则被分配到空闲的线程,如果没有空闲的线程,那么就会新建一个线程。注意的是,当goroutine执行完毕后,线程不会回收推出,而是成为了空闲的线程。

这里的并发(concurrency) 和 并行(parallelism)是有区别的

并发指的是多个线程在同一处理器上被轮询执行

并行则是在多个处理器上同时执行

 

 

接下来我们看看线程安全的问题

竞争状态

如果两个或者多个goroutine在没有相互同步的情况下,访问某个共享资源,并试图同时读写这个资源, 就处于相互竞争的状态

插播几句废话

刚开始学习go的时候因为读了一些相关说明, 对go的并发模型期待很高, 认为它会比java的并发模型先进很多

其实还是挺失望的, 对于线程不安全的状态python使用GIL, java则使用同步或者CAS, go其实并没有对java的模型进行质变

还是一个套路

如果你运行下面的java代码,每次运行你都会得到不同的值例如19或者20


public class App {

    static int result = 0;

    public static void add() {
        result++;
    }

    public static void main(String[] args) throws InterruptedException {

        Thread t1 = null;
        Thread t2 = null;

        t1 = new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                add();
                System.out.println("t1:" + result);
            }
        });
        t1.start();

        t2 = new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                add();
                System.out.println("t2:" + result);
            }
        });
        t2.start();

        t1.join();
        t2.join();

        System.out.println(result);

    }

}
 

输出错误结果19时的输出为 , 可以看到第二次和第三次结果两个线程计算出了相同的结果2

t2:1
t2:2
t1:2

t2:3
t1:4
t2:5
t2:6
t2:8
t1:8
t2:9
t1:10
t2:11
t2:12
t1:14
t2:14
t1:15
t1:16
t1:17
t1:18
t1:19
19
 

 

因为java的线程会使用变量的副本进行运算, 所以上面的代码会得到一个经典的线程不安全模型

同样的代码在go中则没有出现同样的情况, 说明go在并发时好像是不持有变量副本的 

package main

import (
    "fmt"
    "runtime"
    "sync"
    "time"
)

var result = 0;

func main(){
    runtime.GOMAXPROCS(runtime.NumCPU())
    
    var wg sync.WaitGroup
    wg.Add(2)
    
    fmt.Println("start")
    
    
    
    go func(){
        defer wg.Done()
        for i:=0;i<100;i++{
            time.Sleep(1000)
            fmt.Println("go1:",i)
            result++
            fmt.Println("result1:",result)
        }
    }()
    
    go func(){
        defer wg.Done()
        for i:=0;i<100;i++{
            time.Sleep(1000)
            fmt.Println("go2:",i)
            result++
            fmt.Println("result2:",result)
        }
    }()
    
    fmt.Println("waiting to finish")
    wg.Wait()
    
    fmt.Println("Done:",result)
}
 

上面的代码会一直输出结果为200

start
waiting to finish
go1: 0
result1: 1
go2: 0
result2: 2
go2: 1
result2: 3
go1: 1
result1: 4
go1: 2
result1: 5
go2: 2
result2: 6
go2: 3
result2: 7
go1: 3
result1: 8
go1: 4
result1: 9
go2: 4
result2: 10
go2: 5
result2: 11
go1: 5
result1: 12
...
...
result1: 196
go1: 98
result1: 197
go2: 98
result2: 198
go2: 99
result2: 199
go1: 99
result1: 200
Done: 200
 

 

如果要像java那样操作,需要对代码进行一些小改动

下面这个例子会和java一样, 每个goroutine都会创造一个变量的副本,然后在调用结束之前一直使用自己的副本,这样也就忽略了其他goroutine修改后的值

package main

import (
    "fmt"
    "runtime"
    "sync"
    "time"
)

var result = 0;

func main(){
    runtime.GOMAXPROCS(runtime.NumCPU())
    
    var wg sync.WaitGroup
    wg.Add(2)
    
    fmt.Println("start")
    
    go func(){
        defer wg.Done()
        for i:=0;i<10;i++{
            time.Sleep(1000)
            fmt.Println("go1:",i)
            temp:=result
            runtime.Gosched()
            temp++
            result=temp
            fmt.Println("result1:",result)
        }
    }()
    
    go func(){
        defer wg.Done()
        for i:=0;i<10;i++{
            time.Sleep(1000)
            fmt.Println("go2:",i)
            temp:=result
            runtime.Gosched()
            temp++
            result=temp
            fmt.Println("result2:",result)
        }
    }()
    
    fmt.Println("waiting to finish")
    wg.Wait()
    
    fmt.Println("Done:",result)
}
 

 

输出

start
waiting to finish
go1: 0
go2: 0
result1: 1
result2: 1
go2: 1
go1: 1
result2: 2
result1: 2
go1: 2
go2: 2
result1: 3
result2: 3
go2: 3
result2: 4
go1: 3
result1: 5
go1: 4
go2: 4
result1: 6
result2: 6
go2: 5
go1: 5
result2: 7
result1: 7
go1: 6
go2: 6
result1: 8
result2: 8
go2: 7
go1: 7
result2: 9
result1: 9
go1: 8
go2: 8
result1: 10
result2: 10
go2: 9
go1: 9
result2: 11
result1: 11
Done: 11
 

解决办法其实也和java差不多

 

基于atomic实现

package main

import (
    "fmt"
    "runtime"
    "sync"
    "time"
    "sync/atomic"
)

var result int64 = 0;

func main(){
    runtime.GOMAXPROCS(runtime.NumCPU())
    
    var wg sync.WaitGroup
    wg.Add(2)
    
    fmt.Println("start")
    
    go func(){
        defer wg.Done()
        for i:=0;i<10;i++{
            time.Sleep(1000)
            fmt.Println("go1:",i)
            runtime.Gosched()
            atomic.AddInt64(&result, 1);
            fmt.Println("result1:",result)
        }
    }()
    
    go func(){
        defer wg.Done()
        for i:=0;i<10;i++{
            time.Sleep(1000)
            fmt.Println("go2:",i)
            runtime.Gosched()
            atomic.AddInt64(&result, 1);
            fmt.Println("result2:",result)
        }
    }()
    
    fmt.Println("waiting to finish")
    wg.Wait()
    
    fmt.Println("Done:",result)
}

 

基于通道(channel)

当一个资源需要在goroutine之间共享时, 通道在goroutine之间架起了一个管道, 并提供了确保同步交换数据的机制

声明通道时, 需要指定将要被共享的数据类型.

可以通过通道共享  内置类型, 命名类型, 结构类型 和 引用类型的值或者指针

在go语言中需要使用内置函数 make 来创建一个通道

unbuffered := make( chan int )

buffered := make( chan string,10 )

向通道发送值

buffered <- "the mssages"

从通道接收一个字符串

value := <-buffered

一个简单的channel例子

package main

import "fmt"

func main() {

    messages := make(chan string)

    go func() { messages <- "ping" }()

    msg := <-messages
    fmt.Println(msg)
}
 

 

channel的工作方式有些类似于生产者和消费者, 无缓存的channel 无论是 读 还是 写 都会造成阻塞

package main

import (
    "fmt"
    "runtime"
    "sync"
    "time"
)

var result int = 0

func main() {

    runtime.GOMAXPROCS(runtime.NumCPU())
    var wg sync.WaitGroup
    wg.Add(2)

    messages := make(chan int)

    go func() {
        defer wg.Done()
        for i := 0; i < 10; i++ {
            result++
            messages <- result
        }

    }()

    go func() {
        defer wg.Done()
        for i := 0; i < 10; i++ {
            time.Sleep(1 * time.Second)
            result := <-messages
            fmt.Println(result)
        }

    }()

    fmt.Println("waiting to finish")
    wg.Wait()
    fmt.Println(result)
}
 

 

最后我们用两个channel 实现一下最开始的 双线程 计数器

package main

import (
    "fmt"
    "runtime"
    "sync"
    "time"
)

func main() {
    runtime.GOMAXPROCS(runtime.NumCPU())
    messages := make(chan int)
    var wg sync.WaitGroup
    wg.Add(2)
    go func() {
        defer wg.Done()
        for {
            time.Sleep(1 * time.Second)
            result, ok := <-messages
            if result >= 5 {
                close(messages)
                return
            }
            if ok {
                result++
                fmt.Println("1:", result, ok)
                messages <- result
            } else {
                return
            }

        }

    }()

    go func() {
        defer wg.Done()
        for {
            time.Sleep(1 * time.Second)
            result, ok := <-messages
            if result >= 5 {
                close(messages)
                return
            }
            if ok {
                result++
                fmt.Println("2:", result, ok)
                messages <- result
            } else {
                return
            }
        }

    }()

    messages <- 0

    fmt.Println("waiting to finish")
    wg.Wait()
}
 

输出是

waiting to finish
1: 1 true
2: 2 true
1: 3 true
2: 4 true
1: 5 true