抑制并发请求-singleflight

抑制并发请求-singleflight

背景

高并发的场景下,经常会出现并发重复请求资源的情况。

比如说,缓存失效时,我们去请求db获取最新的数据,如果这个key是一个热key,那么在缓存失效的瞬间,可能会有大量的并发请求访问到db,导致db访问量陡增,甚至是打崩db,这种场景也就是我们常说的缓存击穿。

并发请求资源

针对同一个key的并发请求,这些请求和响应实际上都是一样的。所以我们可以把这种并发请求优化为:只进行一次实际请求去访问资源,然后得到实际响应,所有的并发请求共享这个实际响应的结果

针对分布式场景,我们可以使用分布式锁来实现

针对单机场景,我们可以使用singleflight来实现

singleflight

singleflight

singleflight是golang内置的一个包,这个包提供了对重复函数调用的抑制功能,也就是保证并发请求只会有一个实际请求去访问资源,所有并发请求共享实际响应。

使用

singleflight在golang sdk源码中的路径为:src/internal/singleflight

但是internal是golang sdk内部的包,所以我们不能直接去使用

使用步骤:

  • 引入go mod
  • 使用singleflight包
  • 引入go mod

    go get golang.org/x/sync

    使用singleflight包

    singleflight包主要提供了三个方法

    // 方法作用:保证并发请求只会执行一次函数,并共享实际响应// 请求参数// key:请求的唯一标识,相同的key会被视为并发请求// fn:实际需要执行的函数// 响应参数// v:实际执行函数的返回值// err:实际执行函数的错误// shared:返回值v是否被共享,若存在并发请求,则为true;若不存在并发请求则为falsefunc (g *Group) Do(key string, fn func() (any, error)) (v any, err error, shared bool)// 方法作用:和Do类似,不过方法返回的是chanfunc (g *Group) DoChan(key string, fn func() (any, error)) (<-chan Result, bool)// 方法作用:删除key,一般来说不会直接使用这个方法func (g *Group) ForgetUnshared(key string) bool

    针对以上的三个方法,我们重点了解一下Do方法的使用即可

    没有使用singleflight之前

    package mainimport ( “fmt” “sync” “testing” “time”)var ( mx sync.Mutex wg sync.WaitGroup cacheData = make(map[string]string, 0))func TestSingleFlight(t *testing.T) { // 添加10个任务,模拟并发请求 wg.Add(10) for i := 0; i < 10; i++ { go getData("demo") } // 等待所有任务完成 wg.Wait()}func getData(key string) { data, _ := getDataFromCache(key) if len(data) == 0 { // 缓存没有找到,则进行回源 data, _ = getDataFromDB(key) // 设置缓存 mx.Lock() cacheData[key] = data mx.Unlock() } fmt.Println(data) // 任务完成 wg.Done()}func getDataFromCache(key string) (string, error) { return cacheData[key], nil}func getDataFromDB(key string) (string, error) { fmt.Println("getDataFromDB key: ", key) // 模拟访问db的耗时 time.Sleep(10 * time.Millisecond) return "db data", nil}

    执行TestSingleFlight函数后,会发现并发请求多次调用了getDataFromDB函数

    使用singleflight之后

    package mainimport ( “fmt” “golang.org/x/sync/singleflight” “sync” “testing” “time”)var ( mx sync.Mutex wg sync.WaitGroup g singleflight.Group cacheData = make(map[string]string, 0))func TestSingleFlight(t *testing.T) { // 添加10个任务 wg.Add(10) for i := 0; i < 10; i++ { go getDataSingleWarp("demo") } // 等待所有任务完成 wg.Wait()}func getDataSingleWarp(key string) { data, _ := getDataFromCache(key) if len(data) == 0 { // 使用singleflight来避免并发请求,实际改动就这一行 d, _, shared := g.Do(key, func() (interface{}, error) { return getDataFromDB(key) }) fmt.Println(shared) data = d.(string) // 设置缓存 mx.Lock() cacheData[key] = data mx.Unlock() } fmt.Println(data) wg.Done()}func getDataFromCache(key string) (string, error) { return cacheData[key], nil}func getDataFromDB(key string) (string, error) { fmt.Println("getDataFromDB key: ", key) // 模拟访问db的耗时 time.Sleep(10 * time.Millisecond) return "db data", nil}

    执行TestSingleFlight函数后,会发现只调用了一次getDataFromDB函数

    源码分析

    • Group struct:封装并发请求
    • call struct:每一个需要执行的函数,都会被封装成一个call
    • func Do:对并发请求进行控制的方法

    // Copyright 2013 The Go Authors. All rights reserved.// Use of this source code is governed by a BSD-style// license that can be found in the LICENSE file.// Package singleflight provides a duplicate function call suppression// mechanism.package singleflight // import “golang.org/x/sync/singleflight”import ( “bytes” “errors” “fmt” “runtime” “runtime/debug” “sync”)// errGoexit indicates the runtime.Goexit was called in// the user given function.var errGoexit = errors.New(“runtime.Goexit was called”)// A panicError is an arbitrary value recovered from a panic// with the stack trace during the execution of given function.type panicError struct { value interface{} stack []byte}// Error implements error interface.func (p *panicError) Error() string { return fmt.Sprintf(“%v%s”, p.value, p.stack)}func newPanicError(v interface{}) error { stack := debug.Stack() // The first line of the stack trace is of the form “goroutine N [status]:” // but by the time the panic reaches Do the goroutine may no longer exist // and its status will have changed. Trim out the misleading line. if line := bytes.IndexByte(stack[:], ”); line >= 0 { stack = stack[line+1:] } return &panicError{value: v, stack: stack}}// call is an in-flight or completed singleflight.Do calltype call struct { // 保证相同key,只会进行一次实际请求 // 相同key的并发请求会共享返回 wg sync.WaitGroup // These fields are written once before the WaitGroup is done // and are only read after the WaitGroup is done. // 实际执行函数的返回值和错误 val interface{} err error // forgotten indicates whether Forget was called with this call’s key // while the call was still in flight. // 是否已删除当前并发请求的key forgotten bool // These fields are read and written with the singleflight // mutex held before the WaitGroup is done, and are read but // not written after the WaitGroup is done. // 并发请求的次数 dups int chans []chan 0}// DoChan is like Do but returns a channel that will receive the// results when they are ready.//// The returned channel will not be closed.func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result { ch := make(chan Result, 1) g.mu.Lock() if g.m == nil { g.m = make(map[string]*call) } if c, ok := g.m[key]; ok { c.dups++ c.chans = append(c.chans, ch) g.mu.Unlock() return ch } c := &call{chans: []chan 0 { go panic(e) select {} // Keep this goroutine around so that it will appear in the crash dump. } else { panic(e) } } else if c.err == errGoexit { // Already in the process of goexit, no need to call again } else { // Normal return for _, ch := range c.chans { ch 0} } } }() // 匿名函数立即执行 func() { defer func() { if !normalReturn { // Ideally, we would wait to take a stack trace until we've determined // whether this is a panic or a runtime.Goexit. // // Unfortunately, the only way we can distinguish the two is to see // whether the recover stopped the goroutine from terminating, and by // the time we know that, the part of the stack trace relevant to the // panic has been discarded. if r := recover(); r != nil { c.err = newPanicError(r) } } }() // 执行实际函数 c.val, c.err = fn() // 正常返回 normalReturn = true }() if !normalReturn { recovered = true }}// Forget tells the singleflight to forget about a key. Future calls// to Do for this key will call the function rather than waiting for// an earlier call to complete.func (g *Group) Forget(key string) { g.mu.Lock() if c, ok := g.m[key]; ok { c.forgotten = true } delete(g.m, key) g.mu.Unlock()}

    郑重声明:本文内容及图片均整理自互联网,不代表本站立场,版权归原作者所有,如有侵权请联系管理员(admin#wlmqw.com)删除。
    (0)
    用户投稿
    上一篇 2022年6月19日
    下一篇 2022年6月19日

    相关推荐

    联系我们

    联系邮箱:admin#wlmqw.com
    工作时间:周一至周五,10:30-18:30,节假日休息