channel

  • 不要通过共享内存来通信,要通过通信来共享内存
  • channel是用来协程通信的
     
  • 错误做法
package main

import "fmt"

func chanDemo() {
   
     
	//	var c chan int // chan  通道里面是int类型  c == nil
	c := make(chan int) // 这里定义没有缓存,所以接收到一个他就会阻塞的到这个值被取走

	c <- 1 // 把 1 送到c中
	c <- 2
	c <- 3
	n:= <-c
	//fatal error: all goroutines are asleep - deadlock!
	//这样因为没人其他的协程接这个通道的数据 所以报错 死锁
	fmt.Println(n)
}

func main() {
   
     
	chanDemo()
}

  • 开一个协程接收数据

func chanDemo() {
   
     
	c := make(chan int)
	go func() {
   
     
		for {
   
     
			n := <-c
			fmt.Println(n)
			//1
			//2
			//3

		}
	}()
	c <- 1 // 把 1 送到c中
	c <- 2
	c <- 3
	time.Sleep(time.Millisecond)
}

func main() {
   
     
	chanDemo()
}

// chan 作为参数
func worker(c chan int) {
   
     
	for {
   
     
		//n := <-c
		//fmt.Println(n)
		fmt.Printf("%c",<-c)

	}
}
func chanDemo() {
   
     
	var channels [10]chan int // 数组
	for i:=0;i<10;i++{
   
     
		channels[i] = make(chan int)
		go worker(channels[i])
	}

	for i:=0;i<10;i++{
   
     
		channels[i] <- 'a'+i
	}
	time.Sleep(time.Millisecond)
}

//func createWorker(id int) <-chan int { //这样写表示只用来发送数据 
func createWorker(id int) chan<- int {
   
      //这样写表示只用来接收数据 用于发送数据会报错
	c := make(chan int)
	go func() {
   
     
		for {
   
     
			fmt.Printf("worker %d receiver:%c\n", id, <-c)

		}
	}()
	return c
}
func chanDemo() {
   
     
	var channels [10]chan<- int // 数组
	for i := 0; i < 10; i++ {
   
     
		channels[i] = createWorker(i)
	}

	for i := 0; i < 10; i++ {
   
     
		channels[i] <- 'a' + i
	}
	time.Sleep(time.Millisecond)
}
/、
func bufferedChannel() {
   
     
	c := make(chan int, 3) //定义缓冲区
	//因为有大小为3的缓冲区 所以就算没有协程来接,传入数据不大于3个是不会报错的
	c <- 1
	c <- 2
	c <- 3
}

func main() {
   
     
	bufferedChannel()
	//chanDemo()
}

//
close(c) // 通道也可以关闭 通道关闭之后,另一边还是可以接收到数据,是这个类型的默认0值
	// 可以用两个数来接收 n, ok := <-c ok表示是否有值 然后用if ok 做判断
	// 也可以用for n:= range c 用range接收数据 等到c发完就跳出来
	// 如果c不close 会永远接收下去,等到main退出,他也就退出,chan可以不关闭

  • channel
  • buffered channel
  • range
  • 发送数据c`<-1
  • 接收数据n:=`<-c
  • chan<-int只能发数据给channel<-chan int只能从channel接收数据
  • close(c):关闭channel,发送方一旦关闭channel,接收方一直能接收到数据,但是收到的是channel中具体类型的零值
  • 只有发送方才能close channel,range会自动检查channel是否close
  • 如果发送方没有使用close方法,接收方进入循环后会阻塞,也就是说,一个chan如果有close方法,那就是非阻塞的,没有则是阻塞的

使用Channel等待任务结束

方法一

// done用于接收,这样来替换 	time.Sleep(time.Millisecond)
type workers struct {
   
     
	in   chan int
	done chan bool
}

func createWorker(id int, w workers) {
   
     
	go func() {
   
     
		for {
   
     
			fmt.Printf("worker %d receiver:%c\n", id, <-w.in)
			w.done <- true //打印完就送一个true给done

		}
	}()
}

func chanDemo() {
   
     
	var work [10]workers // 数组
	for i := 0; i < 10; i++ {
   
     
		work[i] = workers{
   
     
			in:   make(chan int),
			done: make(chan bool),
		}
		createWorker(i, work[i])
	}
	for i := 0; i < 10; i++ {
   
     
		work[i].in <- 'a' + i
		<- work[i].done // 如果done没有数据 这里会阻塞
	}

}

func main() {
   
     
	chanDemo()
}

缺点:顺序打印,不如不用

worker 0 receiver:a
worker 1 receiver:b
worker 2 receiver:c
worker 3 receiver:d
worker 4 receiver:e
worker 5 receiver:f
worker 6 receiver:g
worker 7 receiver:h
worker 8 receiver:i
worker 9 receiver:j

  • 解决方式
func chanDemo() {
   
     
	var work [10]workers // 数组
	for i := 0; i < 10; i++ {
   
     
		work[i] = workers{
   
     
			in:   make(chan int),
			done: make(chan bool),
		}
		createWorker(i, work[i])
	}
	for i := 0; i < 10; i++ {
   
     
		work[i].in <- 'a' + i
	}
	for _, worker := range work {
   
     
		<-worker.done
	}

}

通过waitGroup等待

type workers struct {
   
     
	in   chan int
	wg *sync.WaitGroup //指针才能使用同一个wg
}

func createWorker(id int, wg *sync.WaitGroup) workers{
   
     
	w := workers{
   
     
		in : make(chan int),
		wg : wg,
	}
	go func() {
   
     
		for {
   
     
			fmt.Printf("worker %d receiver:%c\n", id, <-w.in)
			wg.Done()
		}
	}()
	return w
}

func chanDemo() {
   
     
	var wg sync.WaitGroup
	wg.Add(20) //20个任务
	var work [10]workers // 数组
	for i := 0; i < 10; i++ {
   
     
		work[i] = createWorker(i, &wg) //所有的要用同一个wg 所以传指针
	}
	for i := 0; i < 10; i++ {
   
     
		work[i].in <- 'a' + i
	}
	for i := 0; i < 10; i++ {
   
     
		work[i].in <- 'A' + i
	}
	wg.Wait() //等待20个任务完成再往下执行
}

func main() {
   
     
	chanDemo()
}

使用Select来进行调度

func main() {
   
     

	var c1, c2 chan int // nil 
	select {
   
      //哪一个条件满足就走哪个条件
	case n := <-c1:
		fmt.Println("received from c1:", n)
	case n := <-c2:
		fmt.Println("received from c2:", n)
	default:
		fmt.Println("no value") //这里 
	}
}
/
func generator() chan int {
   
     
	c := make(chan int)
	i := 0
	go func() {
   
     
		for {
   
     
			time.Sleep(time.Millisecond * time.Duration(rand.Intn(1500)))
			c <- i
			i++
		}
	}()
	return c
}

func main() {
   
     
	var c1, c2  = generator(), generator()
	for  {
   
     
	select {
   
      //哪一个条件满足就走哪个条件
	case n := <-c1:
		fmt.Println("received from c1:", n)
	case n := <-c2:
		fmt.Println("received from c2:", n)
	}

	}
}
//
func main() {
   
     
	var c1, c2 = generator(), generator()
	w := createWorker(0)
	var n int
	for {
   
     
		select {
   
      //哪一个条件满足就走哪个条件
		case n = <-c1:
			fmt.Println("received from c1:", n)
		case n = <-c2:
			fmt.Println("received from c2:", n)
		case w <- n: //也可以发 但是因为w不是nil 如果n没有值他会输出零值 这里就会无限循环走这个逻辑
		}
	}
}
///解决办法利用chan的nil值

func createWorker(id int) chan int {
   
     
	w := make(chan int)
	go func() {
   
     
		for {
   
     
			fmt.Printf("worker %d receiver:%d\n", id, <-w)
		}
	}()
	return w
}
func generator() chan int {
   
     
	c := make(chan int)
	i := 0
	go func() {
   
     
		for {
   
     
			time.Sleep(time.Millisecond * time.Duration(rand.Intn(1500)))
			c <- i
			i++
		}
	}()
	return c
}

func main() {
   
     
	var c1, c2 = generator(), generator()
	w := createWorker(0)
	var n int
	hasValue := false
	for {
   
     
		var activeWork chan int
		if hasValue {
   
     
			activeWork = w
		}
		select {
   
      //哪一个条件满足就走哪个条件
		case n = <-c1:
			hasValue = true
			fmt.Println("received from c1:", n)
		case n = <-c2:
			hasValue = true
			fmt.Println("received from c2:", n)
		case activeWork <- n: //这样的话如果hasValue为false 这里activeWork为nil这样就会被阻塞住
			hasValue = false // 但是这样数据会丢失因为接收和发送的速度不匹配,可以弄一个队列然后存到队列中去 然后hasValue的判断规则为队列长度是否大于0
		}
	}
}
//

func main() {
   
     
	var c1, c2 = generator(), generator()
	w := createWorker(0)
	var n int
	var values []int
	for {
   
     
		var activeWork chan int
		if len(values) > 0 {
   
     
			activeWork = w
		}
		select {
   
      //哪一个条件满足就走哪个条件
		case n = <-c1:
			values = append(values, n)
			fmt.Println("received from c1:", n)
		case n = <-c2:
			values = append(values, n)
			fmt.Println("received from c2:", n)
		case activeWork <- n: 
			values = values[1:] //弄一个队列然后存到队列中去 然后hasValue的判断规则为队列长度是否大于0
		}
	}
}

计时器的使用

func main() {
   
     
	var c1, c2 = generator(), generator()
	w := createWorker(0)
	var n int
	var values []int
	tm := time.After(10 * time.Second) //10s过后,会往tm通道发送数据
	for {
   
     
		var activeWork chan int
		if len(values) > 0 {
   
     
			activeWork = w
		}
		select {
   
      //哪一个条件满足就走哪个条件
		case n = <-c1:
			values = append(values, n)
			fmt.Println("received from c1:", n)
		case n = <-c2:
			values = append(values, n)
			fmt.Println("received from c2:", n)
		case activeWork <- n:
			values = values[1:] //弄一个队列然后存到队列中去 然后hasValue的判断规则为队列长度是否大于0
		case <-tm:
			fmt.Println("bye") //10s过后bye
			return
		}
	}
}
  • Select的使用
  • 定时器的使用
  • 在Select中使用nil channel

传统的同步机制(尽量少用)

  • waitGroup
  • mutex
  • Cond
type atomicInt struct {
   
     
	value int
	lock sync.Mutex
}
func (a *atomicInt) get() int{
   
     
	a.lock.Lock()
	defer a.lock.Unlock()
	return a.value
}
func (a *atomicInt) increment() {
   
     
	a.lock.Lock()
	defer a.lock.Unlock() //defer的正确用法
	a.value++
}
func main() {
   
     
	var a atomicInt
	a.increment()
	go func() {
   
     
		a.increment()
	}()
	time.Sleep(time.Millisecond)
	fmt.Println(a.get())
}

// 如果要在一段代码端同步,那就变成一个匿名函数就好了
func (a *atomicInt) increment() {
   
     
	func() {
   
     
		a.lock.Lock()
		defer a.lock.Unlock() //defer的正确用法
		//............
		a.value++
		//.........
	}()
}