Golang+Redis分布式可重入锁

概念 计算机科学中,可重入互斥锁(英語:reentrant mutex)是互斥锁的一种,同一线程对其多次加锁不会产生死锁。可重入互斥锁也称递归互斥锁(英語:recursive mutex)或递归锁(英語:recursive lock)。 如果对已经上锁的普通互斥锁进行「加锁」操作,其结果要么失败,要么会阻塞至解锁。而如果换作可重入互斥锁,当且仅当尝试加锁的线程就是持有该锁的线程时,类似的加锁操作就会成功。可重入互斥锁一般都会记录被加锁的次数,只有执行相同次数的解锁操作才会真正解锁。 递归互斥锁解决了普通互斥锁不可重入的问题:如果函数先持有锁,然后执行回调,但回调的内容是调用它自己,就会产生死锁。 参考维基百科:可重入互斥锁 个人观点 在Go中应该很少会有这样的场景,互斥锁从字面上理解,应该不能接收重入,需要重入的场景也不应该考虑互斥锁。个人认为更好的解决方法是从设计的层面避免这种场景的出现。因此,与基于redis的互斥锁不同,这篇文章仅仅是尝试在技术上的实现,在实际应用中应尽可能避免这样的场景出现 参考 功能 在基于redis的互斥锁(自动续期,自动重试)的基础上允许重入 实现的关键功能点: 加锁:同一线程多次加锁时可以通过某个标识识别该线程为当前持有锁的线程,并且加锁次数+1 解锁:解锁时加锁次数-1,直到次数为0,则可以解锁(DEL) hash锁的结构 Thread KEY FIELD VALUE A EXAMPLE_LOCK 304597349587439(线程对应的随机数,标识锁,防止误解锁) 1(当前线程已加锁次数) 基本流程 在不可重入锁的实现里,只需要关心锁的互斥,误解除和自动续期,因此可以直接使用string类型配合SETNX,PEXPIRE,DEL完成加锁,解锁和续期 但可重入锁需要锁可以记录当前线程的标识和当前线程已加锁次数,就需要用redis的hash代替string。因为结构发生了变化,所以在加锁,解锁流程上也会有相应改变 Time ThreadA ThreadB T1 尝试加锁 尝试加锁 T2 加锁成功(key:EXAMPLE_LOCK,field:304597349587439,value:1) 加锁失败 T3 执行当前方法业务代码 尝试重试加锁并等待ThreadA解锁(根据配置间隔和最大重试次数) T4 执行另一个方法业务代码,也可能是递归调用,并再次尝试加锁 T5 加锁成功(key:EXAMPLE_LOCK,field:304597349587439,value:2) T6 执行新的调用方法内的业务代码,直到完成所有嵌套调用 T7 从最里层调用开始解锁,(key:EXAMPLE_LOCK,field:304597349587439,value:1) T8 返回到最外层第一次加锁的位置,解锁(key:EXAMPLE_LOCK,field:304597349587439,value:0) T9 如果当前已加锁次数为0,释放锁 T10 加锁成功 FF 加锁: -- KEYS[1]:锁对应的key -- ARGV[1]:锁的expire -- ARGV[2]:锁对应的计数器field(随机值,防止误解锁),记录当前线程已加锁的次数 -- 判断锁是否空闲 if (redis.call('EXISTS', KEYS[1]) == 0) then -- 线程首次加锁(锁的初始化,值和过期时间) redis.call('HINCRBY', KEYS[1], ARGV[2], 1); redis.call('PEXPIRE', KEYS[1], ARGV[1]); return 1; end; -- 判断当前线程是否持有锁(锁被某个线程持有,通常是程序第N次(N>1)在线程内调用时会执行到此处) if (redis.call('HEXISTS', KEYS[1], ARGV[2]) == 1) then -- 调用次数递增 redis.call('HINCRBY', KEYS[1], ARGV[2], 1); -- 不处理续期,通过守护线程续期 return 1; end; -- 锁被其他线程占用,加锁失败 return 0; 解锁: ...

May 3, 2021 · 4 min · 852 words

Golang+Redis分布式互斥锁

引言 假设我们的某个业务会涉及数据更新,同时在实际场景中有较大并发量。流程:读取->修改->保存,在不考虑基于DB层的并发处理情况下,这种场景可能对部分数据造成不可预期的执行结果,此时可以考虑使用分布式锁来解决该问题 需要解决的问题 锁的误解除 业务执行超时导致并发 重试机制 GET和DEL非原子性 代码 目录结构: │ main.go │ └─demo lock.go lock.go: package demo import ( "context" "fmt" "github.com/go-redis/redis/v8" "math/rand" "time" ) // 重试次数 var retryTimes = 5 // 重试频率 var retryInterval = time.Millisecond * 50 var rdb = redis.NewClient(&redis.Options{ Addr: "localhost:6379", Password: "", // no password set DB: 0, // use default DB }) // 锁的默认过期时间 var expiration time.Duration // 模拟分布式业务加锁场景 func MockTest(tag string) { var ctx, cancel = context.WithCancel(context.Background()) defer func() { // 停止goroutine cancel() }() // 随机value lockV := getRandValue() lockK := "EXAMPLE_LOCK" // 默认过期时间 expiration = time.Millisecond * 200 fmt.Println(tag + "尝试加锁") set, err := rdb.SetNX(ctx, lockK, lockV, expiration).Result() if err != nil { panic(err.Error()) } // 加锁失败,重试 if set == false && retry(ctx, rdb, lockK, lockV, expiration, tag) == false { fmt.Println(tag + " server unavailable, try again later") return } fmt.Println(tag + "成功加锁") // 加锁成功,新增守护线程 go watchDog(ctx, rdb, lockK, expiration, tag) // 处理业务(通过随机时间延迟模拟) fmt.Println(tag + "等待业务处理完成...") time.Sleep(getRandDuration()) // 业务处理完成 // 释放锁 val := delByKeyWhenValueEquals(ctx, rdb, lockK, lockV) fmt.Println(tag+"释放结果:", val) } // 释放锁 func delByKeyWhenValueEquals(ctx context.Context, rdb *redis.Client, key string, value interface{}) bool { lua := ` -- 如果当前值与锁值一致,删除key if redis.call('GET', KEYS[1]) == ARGV[1] then return redis.call('DEL', KEYS[1]) else return 0 end ` scriptKeys := []string{key} val, err := rdb.Eval(ctx, lua, scriptKeys, value).Result() if err != nil { panic(err.Error()) } return val == int64(1) } // 生成随机时间 func getRandDuration() time.Duration { rand.Seed(time.Now().UnixNano()) min := 50 max := 100 return time.Duration(rand.Intn(max-min)+min) * time.Millisecond } // 生成随机值 func getRandValue() int { rand.Seed(time.Now().UnixNano()) return rand.Int() } // 守护线程 func watchDog(ctx context.Context, rdb *redis.Client, key string, expiration time.Duration, tag string) { for { select { // 业务完成 case <-ctx.Done(): fmt.Printf("%s任务完成,关闭%s的自动续期\n", tag, key) return // 业务未完成 default: // 自动续期 rdb.PExpire(ctx, key, expiration) // 继续等待 time.Sleep(expiration / 2) } } } // 重试 func retry(ctx context.Context, rdb *redis.Client, key string, value interface{}, expiration time.Duration, tag string) bool { i := 1 for i <= retryTimes { fmt.Printf(tag+"第%d次尝试加锁中...\n", i) set, err := rdb.SetNX(ctx, key, value, expiration).Result() if err != nil { panic(err.Error()) } if set == true { return true } time.Sleep(retryInterval) i++ } return false } 流程说明 假设MockTest方法就是业务处理方法 ...

May 2, 2021 · 3 min · 440 words