二.并发
多路复用
select {
case ret := <-retCh1:
t.Logf("result %s", ret)
case ret := <-retCh2:
t.Logf("result %s", ret)
default:
t.Error("no one returned")
}
如果执行到select时没有收到channel的消息的时候,如果有default就会执行default
超时
select {
case ret := <-retCh1:
t.Logf("result %s", ret)
case <-time.After(time.Second * 1) // 超时
t.Error("time out")
}
channel的关闭与广播
- 向关闭的channel发送数据, 会导致panic
- v, ok <- ch; ok 为bool值, true表示正常接收, false表示通道关闭
- 所有的channel接收者都会在channel关闭时,立刻从阻塞等待中返回且上述ok值为false。这个广播机制被利用,进行向多个 订阅者同时发送信号。例如:退出信号
- 接收已经关闭的通道,将获取该通道类型的默认值
任务的取消
package cancel_task
import (
"fmt"
"testing"
"time"
)
func isCancelled(cancelChan chan struct{}) bool {
select {
case <-cancelChan:
return true
default:
return false
}
}
func cancel_1(cancelChan chan struct {}) {
cancelChan <- struct {} {
}
}
func cancel_2(cancelChan chan struct {}) {
close(cancelChan)
}
func TestCancel(t *testing.T) {
cancelChan := make(chan struct {}, 0)
for i := 0; i < 5; i++ {
go func(i int, cancelCh chan struct{}) {
for {
if isCancelled(cancelChan) {
break
}
time.Sleep(time.Millisecond * 5)
}
fmt.Println(i, "Cancelled()")
}(i, cancelChan)
}
//cancel_1(cancelChan) // 此方法不可取
cancel_2(cancelChan) // 广播机制
time.Sleep(time.Millisecond * 1)
}
context与任务取消
- 根Context:通过context.Background() 创建
- 子 Context: context.WithCancel(parentContext) 创建
- ctx, cancel := context.WithCancel(context.Background())
- 当前Context被取消时,基于他的子context都会被取消
- 接收取消通知 <-ctx.Done()
package cancel_task
import (
"context"
"fmt"
"testing"
"time"
)
func isCancelled(ctx context.Context) bool {
select {
case <-ctx.Done():
return true
default:
return false
}
}
func TestCancel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
for i := 0; i < 5; i++ {
go func(i int, ctx context.Context) {
for {
if isCancelled(ctx) {
break
}
time.Sleep(time.Millisecond * 5)
}
fmt.Println(i, "Cancelled()")
}(i, ctx)
}
cancel()
time.Sleep(time.Millisecond * 1)
}
仅运行一次
sync.once
package once_test
import (
"fmt"
"sync"
)
type SingletonObj struct {
Id string
}
var once sync.Once
var obj *SingletonObj
func GetSingletonObj() *SingletonObj {
once.Do(func() {
fmt.Println("Create singleton obj")
obj = &SingletonObj{}
})
return obj
}
仅需任意任务完成
package one_ok
import (
"fmt"
"runtime"
"testing"
"time"
)
func runTask(id int) string {
time.Sleep(10 * time.Millisecond)
return fmt.Sprintf("The result is from %d", id)
}
func FirstResponse() string {
numOfRunner := 10
ch := make(chan string, numOfRunner) // 使用buffer channel 防止协程泄露
for i := 0; i < numOfRunner; i++ {
go func(i int) {
ret := runTask(i)
ch <- ret
}(i)
}
return <-ch
}
func TestFirstResponse(t *testing.T) {
t.Log("Befor:", runtime.NumGoroutine())
t.Log(FirstResponse())
time.Sleep(time.Second * 1)
t.Log("After:", runtime.NumGoroutine())
}
所有任务都完成
package mul_ok
import (
"fmt"
"runtime"
"testing"
"time"
)
func runTask(id int) string {
time.Sleep(10 * time.Millisecond)
return fmt.Sprintf("The result is from %d", id)
}
func AllResponse() string {
numOfRunner := 10
ch := make(chan string, numOfRunner) // 使用buffer channel 防止协程泄露
for i := 0; i < numOfRunner; i++ {
go func(i int) {
ret := runTask(i)
ch <- ret
}(i)
}
finalRet := ""
for j:=0;j< numOfRunner;j++ {
finalRet += <-ch + "\n"
}
return finalRet
}
func TestFirstResponse(t *testing.T) {
t.Log("Befor:", runtime.NumGoroutine())
t.Log(AllResponse())
time.Sleep(time.Second * 1)
t.Log("After:", runtime.NumGoroutine())
}
对象池
使用buffer channel实现对象池
package pool_test
import (
"errors"
"fmt"
"testing"
"time"
)
type ReusableObj struct {
}
type ObjPool struct {
bufChan chan *ReusableObj
}
func NewObjPool(numOfObj int) *ObjPool {
objPool := ObjPool {}
objPool.bufChan = make(chan * ReusableObj, numOfObj)
for i := 0; i < numOfObj; i++ {
objPool.bufChan <- &ReusableObj{
}
}
return &objPool
}
func (p *ObjPool) GetObj(timeout time.Duration) (*ReusableObj, error) {
select {
case ret := <- p.bufChan:
return ret, nil
case <-time.After(timeout):
return nil, errors.New("time out")
}
}
func (p *ObjPool) ReleaseObj(obj *ReusableObj) error {
select {
case p.bufChan <- obj:
return nil
default:
return errors.New("overflow")
}
}
func TestObjPool(t *testing.T) {
pool := NewObjPool(10)
for i := 0; i < 11; i++ {
if v, err := pool.GetObj(time.Second + 1); err != nil {
t.Error(err)
} else {
fmt.Printf("%T\n", v)
if err := pool.ReleaseObj(v); err != nil {
t.Error(err)
}
}
}
}
sync.Pool对象缓存
- 尝试从私有对象获取
- 私有对象不存在,尝试从当前Processor的共享池获取
- 如果当前Processor共享池也是空的,那么就尝试去其他Processor的共享池获取
- 如果所有子池都是空的,最后就用用户指定的New函数产生一个新的对象返回
私有对象: 协程安全
共享池: 协程不安全
sync.Pool对象的放回
- 如果私有对象不存在则保存为私有对象
- 如果私有对象存在,放入当前Processor子池的共享池中
pool := &sync.Pool{
New: func() interface{} {
return 0
},
}
arry := pool.Get().(int)
...
pool.Put(10)
sync.Pool对象的生命周期
- GC会清除sync.pool缓存的对象
- 对象的缓存有效期为下一次GC之前
sync.Pool总结
- 适用于通过复用,降低复杂对象的创建和GC代价
- 协程安全, 会有锁的开销
- 生命周期受GC影响,不适合于做 连接池等,需要自己管理生命周期的资源的池化