Golang+Redis分布式可重入锁

概念 计算机科学中,可重入互斥锁(英語:reentrant mutex)是互斥锁的一种,同一线程对其多次加锁不会产生死锁。可重入互斥锁也称递归互斥锁(英語:recursive mutex)或递归锁(英語:recursive lock)。 如果对已经上锁的普通互斥锁进行「加锁」操作,其结果要么失败,要么会阻塞至解锁。而如果换作可重入互斥锁,当且仅当尝试加锁的线程就是持有该锁的线程时,类似的加锁操作就会成功。可重入互斥锁一般都会记录被加锁的次数,只有执行相同次数的解锁操作才会真正解锁。 递归互斥锁解决了普通互斥锁不可重入的问题:如果函数先持有锁,然后执行回调,但回调的内容是调用它自己,就会产生死锁。 参考维基百科:可重入互斥锁 个人观点 在Go中应该很少会有这样的场景,互斥锁从字面上理解,应该不能接收重入,需要重入的场景也不应该考虑互斥锁。个人认为更好的解决方法是从设计的层面避免这种场景的出现。因此,与基于redis的互斥锁不同,这篇文章仅仅是尝试在技术上的实现,在实际应用中应尽可能避免这样的场景出现 参考 功能 在基于redis的互斥锁(自动续期,自动重试)的基础上允许重入 实现的关键功能点: 加锁:同一线程多次加锁时可以通过某个标识识别该线程为当前持有锁的线程,并且加锁次数+1 解锁:解锁时加锁次数-1,直到次数为0,则可以解锁(DEL) hash锁的结构 Thread KEY FIELD VALUE A EXAMPLE_LOCK 304597349587439(线程对应的随机数,标识锁,防止误解锁) 1(当前线程已加锁次数) 基本流程 在不可重入锁的实现里,只需要关心锁的互斥,误解除和自动续期,因此可以直接使用string类型配合SETNX,PEXPIRE,DEL完成加锁,解锁和续期 但可重入锁需要锁可以记录当前线程的标识和当前线程已加锁次数,就需要用redis的hash代替string。因为结构发生了变化,所以在加锁,解锁流程上也会有相应改变 Time ThreadA ThreadB T1 尝试加锁 尝试加锁 T2 加锁成功(key:EXAMPLE_LOCK,field:304597349587439,value:1) 加锁失败 T3 执行当前方法业务代码 尝试重试加锁并等待ThreadA解锁(根据配置间隔和最大重试次数) T4 执行另一个方法业务代码,也可能是递归调用,并再次尝试加锁 T5 加锁成功(key:EXAMPLE_LOCK,field:304597349587439,value:2) T6 执行新的调用方法内的业务代码,直到完成所有嵌套调用 T7 从最里层调用开始解锁,(key:EXAMPLE_LOCK,field:304597349587439,value:1) T8 返回到最外层第一次加锁的位置,解锁(key:EXAMPLE_LOCK,field:304597349587439,value:0) T9 如果当前已加锁次数为0,释放锁 T10 加锁成功 FF 加锁: -- KEYS[1]:锁对应的key -- ARGV[1]:锁的expire -- ARGV[2]:锁对应的计数器field(随机值,防止误解锁),记录当前线程已加锁的次数 -- 判断锁是否空闲 if (redis.call('EXISTS', KEYS[1]) == 0) then -- 线程首次加锁(锁的初始化,值和过期时间) redis.call('HINCRBY', KEYS[1], ARGV[2], 1); redis.call('PEXPIRE', KEYS[1], ARGV[1]); return 1; end; -- 判断当前线程是否持有锁(锁被某个线程持有,通常是程序第N次(N>1)在线程内调用时会执行到此处) if (redis.call('HEXISTS', KEYS[1], ARGV[2]) == 1) then -- 调用次数递增 redis.call('HINCRBY', KEYS[1], ARGV[2], 1); -- 不处理续期,通过守护线程续期 return 1; end; -- 锁被其他线程占用,加锁失败 return 0; 解锁: ...

May 3, 2021 · 4 min · 852 words

Golang+Redis分布式互斥锁

引言 假设我们的某个业务会涉及数据更新,同时在实际场景中有较大并发量。流程:读取->修改->保存,在不考虑基于DB层的并发处理情况下,这种场景可能对部分数据造成不可预期的执行结果,此时可以考虑使用分布式锁来解决该问题 需要解决的问题 锁的误解除 业务执行超时导致并发 重试机制 GET和DEL非原子性 代码 目录结构: │ main.go │ └─demo lock.go lock.go: package demo import ( "context" "fmt" "github.com/go-redis/redis/v8" "math/rand" "time" ) // 重试次数 var retryTimes = 5 // 重试频率 var retryInterval = time.Millisecond * 50 var rdb = redis.NewClient(&redis.Options{ Addr: "localhost:6379", Password: "", // no password set DB: 0, // use default DB }) // 锁的默认过期时间 var expiration time.Duration // 模拟分布式业务加锁场景 func MockTest(tag string) { var ctx, cancel = context.WithCancel(context.Background()) defer func() { // 停止goroutine cancel() }() // 随机value lockV := getRandValue() lockK := "EXAMPLE_LOCK" // 默认过期时间 expiration = time.Millisecond * 200 fmt.Println(tag + "尝试加锁") set, err := rdb.SetNX(ctx, lockK, lockV, expiration).Result() if err != nil { panic(err.Error()) } // 加锁失败,重试 if set == false && retry(ctx, rdb, lockK, lockV, expiration, tag) == false { fmt.Println(tag + " server unavailable, try again later") return } fmt.Println(tag + "成功加锁") // 加锁成功,新增守护线程 go watchDog(ctx, rdb, lockK, expiration, tag) // 处理业务(通过随机时间延迟模拟) fmt.Println(tag + "等待业务处理完成...") time.Sleep(getRandDuration()) // 业务处理完成 // 释放锁 val := delByKeyWhenValueEquals(ctx, rdb, lockK, lockV) fmt.Println(tag+"释放结果:", val) } // 释放锁 func delByKeyWhenValueEquals(ctx context.Context, rdb *redis.Client, key string, value interface{}) bool { lua := ` -- 如果当前值与锁值一致,删除key if redis.call('GET', KEYS[1]) == ARGV[1] then return redis.call('DEL', KEYS[1]) else return 0 end ` scriptKeys := []string{key} val, err := rdb.Eval(ctx, lua, scriptKeys, value).Result() if err != nil { panic(err.Error()) } return val == int64(1) } // 生成随机时间 func getRandDuration() time.Duration { rand.Seed(time.Now().UnixNano()) min := 50 max := 100 return time.Duration(rand.Intn(max-min)+min) * time.Millisecond } // 生成随机值 func getRandValue() int { rand.Seed(time.Now().UnixNano()) return rand.Int() } // 守护线程 func watchDog(ctx context.Context, rdb *redis.Client, key string, expiration time.Duration, tag string) { for { select { // 业务完成 case <-ctx.Done(): fmt.Printf("%s任务完成,关闭%s的自动续期\n", tag, key) return // 业务未完成 default: // 自动续期 rdb.PExpire(ctx, key, expiration) // 继续等待 time.Sleep(expiration / 2) } } } // 重试 func retry(ctx context.Context, rdb *redis.Client, key string, value interface{}, expiration time.Duration, tag string) bool { i := 1 for i <= retryTimes { fmt.Printf(tag+"第%d次尝试加锁中...\n", i) set, err := rdb.SetNX(ctx, key, value, expiration).Result() if err != nil { panic(err.Error()) } if set == true { return true } time.Sleep(retryInterval) i++ } return false } 流程说明 假设MockTest方法就是业务处理方法 ...

May 2, 2021 · 3 min · 440 words

使用redis解决并发引起的业务问题

上周线上项目遇到了一点问题,处理完成后,在这做个小结 相关功能 后台可通过配置发起拼团, 每个拼团可指定同一个用户的最大参与次数 遇到的问题 由于并发, 导致了代码在有前置判断用户拼团是否超限的前提下, 仍然可能出现拼团次数超限的情况 数据表结构 -- ---------------------------- -- Table structure for logs -- ---------------------------- DROP TABLE IF EXISTS `logs` ; CREATE TABLE `logs` ( `id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT 'pk', `act_id` int(6) unsigned NOT NULL DEFAULT '0' COMMENT '', `user_id` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '', `pay_amount` int(6) unsigned NOT NULL DEFAULT '0' COMMENT '', `cat` tinyint(1) unsigned NOT NULL DEFAULT '2' COMMENT '', `created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '', `form_id` varchar(32) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT '', utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT '', PRIMARY KEY ( `id` ) USING BTREE, KEY `idx_userId` ( `user_id` ) USING BTREE ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='用户拼团记录'; 原有代码 // 开始处理请求 // 判断用户购买次数是否超限(通过 `act_id` && `user_id` )(当前N个并发请求同时通过验证,N>限制次数-用户已拼团次数) // 开启事务 // 写入用户请求日志 // 提交事务(已写入N条数据,N>限制次数-用户已拼团次数) // 请求处理完成 以上代码无法避免重复写入超过限制次数的用户拼团记录 ...

November 17, 2019 · 2 min · 350 words

redis在目前项目的部分应用场景

场景一:设备在线状态保持 需求 有若干台设备,设备具备定时向server发送heartbeat的功能,后台需要在设备列表实时监控设备的在线情况(允许有不大于5分钟的时间延迟) 实现 在openresty配置中加入用于处理心跳请求的location块,如下 location /device/heartbeat { default_type text/html; lua_need_request_body on; content_by_lua_file src/heartbeat_detector.lua; } lua脚本 -- 获取请求方式,只允许POST请求 local request_method = ngx.var.request_method -- ngx.log(ngx.ERR, "headers:"..ngx.var.http_user_agent) if request_method ~= "POST" then ngx.exit(ngx.HTTP_NOT_ALLOWED) end local function close_redis(red) if not red then return end --释放连接(连接池实现) local pool_max_idle_time = 10000 --毫秒 local pool_size = 100 --连接池大小 local ok, err = red:set_keepalive(pool_max_idle_time, pool_size) if not ok then ngx.log(ngx.ERR, "set redis keepalive error : ", err) end end -- 引入json处理库 local cjson = require "cjson" -- 临时文件读取函数 function getFile(file_name) local f = assert(io.open(file_name, 'r')) local string = f:read("*all") f:close() return string end -- 获取请求体 ngx.req.read_body() -- 获取请求body local data = ngx.req.get_body_data() -- 如果请求body为空,则从临时文件获取 if nil == data then local file_name = ngx.req.get_body_file() if file_name then data = getFile(file_name) end end -- 将data转换为object local obj = cjson.decode(data) -- 如果当前请求为心跳维持请求 if obj.Name == "KeepAlive" then --[[ 初始化redis ]] local redis = require "resty.redis" local red = redis:new() red:set_timeout(1000) local host = 'ip' local port = port local ok, err = red:connect(host,port) if not ok then return close_redis(red) end -- 请注意这里 auth 的调用过程 local count ount, err = red:get_reused_times() if 0 == count then ok, err = red:auth("password") if not ok then ngx.say("failed to auth: ", err) return end elseif err then ngx.say("failed to get reused times: ", err) return end -- redis前缀 local prefix = 'device_heartbeat:' -- 缓存心跳 res, err = red:set(prefix..obj.DeviceId, 1, 'EX', 300) end 需要保证在项目的已有路由中没有/device/heartbeat,此时/device/heartbeat的请求会交给lua脚本处理,相对于php性能消耗更少,php后台获取到数据后可直接在redis中查找该设备对应的心跳key是否存在来确认设备是否在线 ...

April 3, 2019 · 3 min · 613 words

laravel基于redis的分布式秒杀系统

场景 本文暂不讨论前端页面,cdn在秒杀上的性能优化,只关注从用户请求到达web服务器开始直至秒杀完成在redis中生成订单结束这个阶段的实现,后续还需要使用redis队列异步生成mysql订单实现数据的持久化 实现 为了方便测试结果,当前本地的测试环境如下: web服务器 使用了openresty监听本地的80端口,并代理到3台负载均衡服务器,由负载均衡服务器调用php-fpm实际处理所有请求的业务 nginx.conf中加入 upstream test { server localhost:16888; server localhost:16889; server localhost:16890; } conf.d/default.conf中加入 server { listen 80; server_name localhost; location / { proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_pass http://test; } } server { listen 16888; server_name localhost; access_log /var/log/nginx/balancer-1.access.log main; error_log /var/log/nginx/balancer-1.error.log warn; root /data/www/community/public; location / { access_by_lua_block{ -- request header方便后台分辨请求来源服务器 ngx.req.set_header('balancer', 'balancer-1') -- response header方便客户端查看当前请求由哪个服务器处理(仅测试用) ngx.header['balancer'] = 'balancer-1' } try_files $uri $uri/ /index.php?$query_string; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header Host $http_host; proxy_set_header X-NginX-Proxy true; index index.html index.htm index.php; } location ~ \.php$ { fastcgi_pass php72:9000; fastcgi_index index.php; fastcgi_param SCRIPT_FILENAME $document_root$fastcgi_script_name; include fastcgi_params; } } server { listen 16889; server_name localhost; access_log /var/log/nginx/balancer-2.access.log main; error_log /var/log/nginx/balancer-2.error.log warn; root /data/www/community/public; location / { access_by_lua_block{ ngx.req.set_header('balancer', 'balancer-2') ngx.header['balancer'] = 'balancer-2' } try_files $uri $uri/ /index.php?$query_string; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header Host $http_host; proxy_set_header X-NginX-Proxy true; index index.html index.htm index.php; } location ~ \.php$ { fastcgi_pass php72:9000; fastcgi_index index.php; fastcgi_param SCRIPT_FILENAME $document_root$fastcgi_script_name; include fastcgi_params; } } server { listen 16890; server_name localhost; access_log /var/log/nginx/balancer-3.access.log main; error_log /var/log/nginx/balancer-3.error.log warn; root /data/www/community/public; location / { access_by_lua_block{ ngx.req.set_header('balancer', 'balancer-3') ngx.header['balancer'] = 'balancer-3' } try_files $uri $uri/ /index.php?$query_string; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header Host $http_host; proxy_set_header X-NginX-Proxy true; index index.html index.htm index.php; } location ~ \.php$ { fastcgi_pass php72:9000; fastcgi_index index.php; fastcgi_param SCRIPT_FILENAME $document_root$fastcgi_script_name; include fastcgi_params; } } 同时开放本地的16888,16889,16890接口 ...

March 8, 2019 · 2 min · 415 words