Gosingleflight和backoff包介绍和组合使用
业务中我们经常遇到一些重复使用的轮子代码,本篇介绍了 singleflight 和 backoff 以及本地缓存!来提高我们平时业务开发的效率和代码的精简度! singleflight介绍源码位置: https://github.com/golang/groupcache/tree/master/singleflight 或者 golang.org/x/sync/singleflight 主要是google 开源的group cache 封装的sdk,目的是为了解决 cache 回源的时候,容易出现并发加载一个或者多个key,导致缓存击穿 其中 golang.org/x/sync/singleflight 它提供了更多方法,比如异步加载等!但是代码量增加了很多,比如很多异步的bug之类的! 简单使用简单模拟100个并发请求去加载 k1 package main import ( "fmt" "sync" "github.com/golang/groupcache/singleflight" ) var ( cache sync.Map sf = singleflight.Group{} ) func main() { key := "k1" // 假如现在有100个并发请求访问 k1 wg := sync.WaitGroup{} wg.Add(100) for x := 0; x < 100; x++ { go func() { defer wg.Done() loadKey(key) }() } wg.Wait() fmt.Printf("result key: %s ", loadKey(key)) } func loadKey(key string) (v string) { if data, ok := cache.Load(key); ok { return data.(string) } data, err := sf.Do(key, func() (interface{}, error) { data := "data" + "|" + key fmt.Printf("load and set success, data: %s ", data) cache.Store(key, data) return data, nil }) if err != nil { // todo handler panic(err) } return data.(string) } // output //load and set success, data: data|k1 //load and set success, data: data|k1 //result key: data|k1
可以看到输出中,其中有2次去 loadKeyFromRemote 去加载,并没有做到完全防止得到的作用如何解决上诉问题了,问题出在哪了?我们进行简单的源码分析 源码分析数据结构 // call is an in-flight or completed Do call type call struct { wg sync.WaitGroup val interface{} err error } // Group represents a class of work and forms a namespace in which // units of work can be executed with duplicate suppression. type Group struct { mu sync.Mutex // protects m m map[string]*call // lazily initialized // 懒加载 }主逻辑 func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) { g.mu.Lock() // lock if g.m == nil { // 懒加载 g.m = make(map[string]*call) } // 如果key存在,则wait if c, ok := g.m[key]; ok { g.mu.Unlock() c.wg.Wait() return c.val, c.err } // new caller + wg add + set c := new(call) c.wg.Add(1) g.m[key] = c g.mu.Unlock() // 调用方法 c.val, c.err = fn() // notify c.wg.Done() // 删除key,防止内存泄漏 g.mu.Lock() delete(g.m, key) g.mu.Unlock() return c.val, c.err }(1) 首先会去初始化一个 caller,然后waitgroup ++ ,然后set [锁] (2) 然后调用方法,再done [无锁] (3) 最后删除 key [锁] (4) 其他同一个key并发请求,会发现key存在,则直接wait了!
假如现在并发请求,那么此时假如都加载同一个key,那么只有一个key先经过,但是计算机执行的很快,在第(2)和(3)步执行的很快,导致key已经删除,但是还有请求未开始 Do 方法或者到了g.m[key] 这一步,都是会再次重新走一遍问题? 能不能使用读写锁优化加锁了?
假如读取key加的读锁,那么此时最长流程变为: 读锁 + 写锁 + 写锁, 最短流程变为: 读锁, 当特别高的并发才会有较为大的提升! 优化后用法package main import ( "fmt" "sync" "github.com/golang/groupcache/singleflight" ) var ( cache sync.Map sf = singleflight.Group{} ) func main() { key := "k1" // 假如现在有100个并发请求访问 k1 wg := sync.WaitGroup{} wg.Add(100) for x := 0; x < 100; x++ { go func() { defer wg.Done() loadKey(key) }() } wg.Wait() fmt.Printf("result key: %s ", loadKey(key)) } func loadKey(key string) (v string) { if data, ok := cache.Load(key); ok { return data.(string) } data, err := sf.Do(key, func() (interface{}, error) { if data, ok := cache.Load(key); ok { // 双重检测 return data.(string), nil } data := "data" + "|" + key fmt.Printf("load and set success, data: %s ", data) cache.Store(key, data) return data, nil }) if err != nil { // todo handler panic(err) } return data.(string) } // output //load and set success, data: data|k1 //result key: data|k1backoff介绍源码地址: github.com/cenkalti/backoff 主要是解决补偿的操作,当业务/方法遇到异常的情况,通常会有补偿的操作,一般就是业务继续重试 我经常使用这个包做重试,感觉比较好用!不用自己写for循环了 简单使用模拟一个异常,去加载一个data数据,当遇到偶数的时候就爆异常! package main import ( "fmt" "math/rand" "time" "github.com/cenkalti/backoff" ) func main() { var ( data interface{} ) if err := backoff.Retry(func() error { if rand.Int()%2 == 0 { // 模拟异常 err := fmt.Errorf("find data mod 2 is zero") fmt.Printf("find err, err: %s ", err) return err } data = "load success" return nil }, backoff.WithMaxRetries(backoff.NewConstantBackOff(time.Millisecond*1), 3)); err != nil { panic(err) } fmt.Printf("data: %s ", data) } //output //find err, err: find data mod 2 is zero //data: load success
结果可以看到很好的解决了重试的问题!代码很优雅! 关于为啥业务中重试都喜欢等待一下,其实比较佛学! sdk介绍back off type BackOff interface { // NextBackOff returns the duration to wait before retrying the operation, // or backoff. Stop to indicate that no more retries should be made. // 是否下一次,以及下一次需要等待的时间! NextBackOff() time.Duration // Reset to initial state. Reset() }封装了四个基本的Backoff // 不需要等待,继续重试 type ZeroBackOff struct{} func (b *ZeroBackOff) Reset() {} func (b *ZeroBackOff) NextBackOff() time.Duration { return 0 } // 不允许重试 type StopBackOff struct{} func (b *StopBackOff) Reset() {} func (b *StopBackOff) NextBackOff() time.Duration { return Stop } // 每次重试等待相同的时间 type ConstantBackOff struct { Interval time.Duration } func (b *ConstantBackOff) Reset() {} func (b *ConstantBackOff) NextBackOff() time.Duration { return b.Interval } func NewConstantBackOff(d time.Duration) *ConstantBackOff { return &ConstantBackOff{Interval: d} } // 重试back off,主要是计数重试的次数,以及基于委托代理模型,实现比较好的拓展 // max=0 会无限重试下去 func WithMaxRetries(b BackOff, max uint64) BackOff { return &backOffTries{delegate: b, maxTries: max} } type backOffTries struct { delegate BackOff maxTries uint64 numTries uint64 } func (b *backOffTries) NextBackOff() time.Duration { if b.maxTries > 0 { if b.maxTries <= b.numTries { return Stop } b.numTries++ } return b.delegate.NextBackOff() } func (b *backOffTries) Reset() { b.numTries = 0 b.delegate.Reset() }自适应backoff
整个时间 < 15min,重试时间从500ms开始增长,每次增长1.5倍,直到60s每次! // NewExponentialBackOff creates an instance of ExponentialBackOff using default values. func NewExponentialBackOff() *ExponentialBackOff { b := &ExponentialBackOff{ InitialInterval: DefaultInitialInterval, RandomizationFactor: DefaultRandomizationFactor, Multiplier: DefaultMultiplier, MaxInterval: DefaultMaxInterval, MaxElapsedTime: DefaultMaxElapsedTime, Clock: SystemClock, } b.Reset() return b }组合使用,构建一个本地缓存!
这个应该是日常开发中经常用到的,本地缓存可以有效解决高频数据但是数据整体占用并不是特别的大,但是每次加载都需要额外的开销,所以基于本地缓存去构建一个可用性比较高的缓存框架! 核心代码 package main import ( "context" "fmt" "time" "github.com/cenkalti/backoff" "golang.org/x/sync/singleflight" ) var ( localCacheCallbackIsNil = fmt.Errorf("cache callback func is nil") ) type CacheOption interface { } type Cache interface { Get(key string) (value interface{}, isExist bool) Set(key string, value interface{}, opts ...CacheOption) } type WrapperCache interface { GetData(ctx context.Context, key string, callback func(ctx context.Context) (interface{}, error)) (v interface{}, err error) } type wrapperCache struct { name string cache Cache singleflight singleflight.Group retrySleepTime time.Duration retryNum uint64 } func NewWrapperCache(name string, cache Cache) WrapperCache { return &wrapperCache{ name: name, cache: cache, retryNum: 3, retrySleepTime: time.Millisecond * 10, } } // emitHitCachedMetric 计算缓存命中率 func (c *wrapperCache) emitHitCachedMetric(hit bool) { } func (c *wrapperCache) GetData(ctx context.Context, key string, callback func(ctx context.Context) (interface{}, error)) (v interface{}, err error) { if result, isExist := c.cache.Get(key); isExist { c.emitHitCachedMetric(true) return result, nil } if callback == nil { return nil, localCacheCallbackIsNil } c.emitHitCachedMetric(false) result, err, _ := c.singleflight.Do(key, func() (interface{}, error) { // 双重检测,防止singleflight 锁的key失效 if result, isExist := c.cache.Get(key); isExist { return result, nil } var callBackData interface{} if err := backoff.Retry(func() error { if data, err := callback(ctx); err != nil { return err } else { callBackData = data return nil } }, backoff.WithMaxRetries(backoff.NewConstantBackOff(c.retrySleepTime), c.retryNum)); err != nil { // todo add log return nil, err } c.cache.Set(key, callBackData) return callBackData, nil }) if err != nil { return nil, err } return result, nil }cache 实现
这里介绍一下 sync.Map 为一个无过期的本地缓存和go-cache 有ttl的缓存框架!或者你自己去实现一个也可以!import ( "sync" "github.com/patrickmn/go-cache" ) type localCache struct { sync.Map } func (l *localCache) Get(key string) (value interface{}, isExist bool) { return l.Load(key) } func (l *localCache) Set(key string, value interface{}, opts ...CacheOption) { l.Store(key, value) } type goCache struct { *cache.Cache } func (l goCache) Set(key string, value interface{}, opts ...CacheOption) { l.SetDefault(key, value) }测试用例 import ( "context" "strconv" "sync" "sync/atomic" "testing" "time" "github.com/patrickmn/go-cache" "github.com/stretchr/testify/assert" ) func TestNewCached(t *testing.T) { cached := NewWrapperCache("test", goCache{ Cache: cache.New(time.Second*10, time.Second*30), }) //cached := NewWrapperCache("test", &localCache{}) ctx := context.Background() wg := sync.WaitGroup{} var ( loadTime uint64 = 0 currG = 20 ) wg.Add(currG) for x := 0; x < currG; x++ { go func(x int) { defer wg.Done() for y := 0; y < 200000; y++ { key := y % 10 result, err := cached.GetData(ctx, strconv.Itoa(key), func(ctx context.Context) (interface{}, error) { atomic.AddUint64(&loadTime, 1) t.Logf("load key: %s, num: %d, g_id: %d ", strconv.Itoa(key), y, x) return int(key), nil }) if err != nil { t.Fatal(err) } if result.(int) != key { t.Fatal("data is not eq err") } } }(x) } wg.Wait() for x := 0; x < 10; x++ { result, _ := cached.GetData(ctx, strconv.Itoa(x), nil) t.Log(result) assert.Equal(t, result.(int), int(x)) } assert.Equal(t, int(loadTime), int(10)) }
最近爆火的sanag气传导蓝牙耳机上手快玩体验骨传导耳机相信大家并不陌生,很多人都在用,其有着良好的亲和力,大多数环境与空间中使用,都有着良好的佩戴体验,运动中又能兼顾周围突发的环境音,当然很多用骨传导耳机的人都深有体会的就是
华为回应与大众组建合资公司研发自动驾驶消息不属实澎湃新闻记者周玲10月20日,有媒体报道称,华为计划与全球汽车巨头大众集团组建合资公司,研发自动驾驶技术。对此,华为回应称,此消息不属实。有媒体援引消息人士的话报道称,华为和大众组
官方优化果然不同,一加9RT拿下原神测试榜双第一昨天公布的原神测试榜单显示,一加9RT以帧率抖动第一,发热最高温度控制第一,平均功耗第四的成绩领跑榜单。这代表什么意思呢?原神玩家都知道,目前想要达到完全不卡顿的效果几乎是不可能的
家里wifi信号差怎么办?家里的wifi为什么老是感觉网速慢?学会这几点让网速变快一大截。摆放位置要把路由器摆放在空旷的地方,比如客厅和主卧,尽量不要让wifi信号穿过太多墙壁和门,另外,金属物体放置在路由
新房装修好了,家里买个扫地机器人到底实不实用呢?新房子装修好了家里经济条件好当然可以购买一台扫地机器人实用。现在的扫地机器人比较多,你可以在网上搜索找一款性价比比较高的扫地机器人。这里本人推荐一款价格为2800元钱的科沃斯机器人
姚期智院士畅谈数字经济核心技术首次介绍数据定价算法18日,图灵奖获得者中国科学院院士清华大学交叉信息学院院长交叉信息核心技术研究院院长姚期智出席了由中央企业数字化发展研究院举办的数字化转型行动学习会议,并做了数字经济领域的核心科技
复古蓝牙音箱上手体验,它找到了时代和记忆的平衡科技因为情怀的注入,变得更有温度时光因为音乐的渗透,而显得韵味十足。这可能是很多影音厂商推出复古音箱的一个初衷吧,本期视频要跟大家分享这款漫步者M230的体验,一起来看看它是否能唤
你们的电脑都安装杀毒软件了吗?会有人攻击私人的电脑吗?我来回答一下这个问题,杀毒软件一般统称为安全软件,杀毒只是其其中一个重要功能,现在网络上的病毒软件相对已经没有以前那么多了,但是流氓软件越来越多,一不小心就被安装了全家桶,搞的一个
天猫精灵可以连接电脑吗?天猫精灵是马云研发的并且在7月5日推出,天猫精灵是把强大的人工智能,塞进了一个杯子大小的硬件。马云突然地进军硬件,让苹果三星也措手不及。这款机器人的背后是一个开放系统,因为它整合了
苹果x信号差怎么办?x的信号不好,我还真不知道,你要是说xr更不好我相信,其实整体来说,iPhone的信号确实差,没有诋毁的意思哦,实事求是而已。我都是站在客观的立场发言已已。苹果确实整个生态系统来说
长期带耳机会有什么影响?长期戴耳机的危害1。损害听力长期戴耳机会损害听力,这点并不难理解,当我们的外耳道口被耳机紧紧堵塞住,高音量的音频声会直接进入耳内而损伤听力,长期戴耳机会对我们的耳蜗造成持续冲击,形