什么场景需要加锁
在并发的场景下经常会出现多个请求同时打进服务,导致出现资源抢占的问题,比如库存、匹配的场景。
举个库存的栗子,请求A和请求B同时进行下单操作,导致两个请求在获取库存余量的时候都是原始值,分别进行了库存-1的update操作,
但是最后我们会发现,原本库存应该减2,实际上只减了1,这就是并发带来的读写不一致问题,这时候就需要加锁操作。
悲观锁
悲观锁认为任何时候都会有并发的资源抢占,也可以理解为独占锁,所以在加锁期间别的请求会处于等待中并重复请求该锁是否被释放。
实现方案
- 直接通过内存加锁 asyncLock
这种加锁方式是直接在内存里通过变量控制,将请求的执行回调通过一个key保存在队列(FIFO)里,
比如请求A进来:
- 如果队列为空,将回调A打进队列,并执行回调A,回调A执行完后会检查该队列中是否存在别的任务,如果存在则会循环调用
- 如果队列不为空,将回调A打进队列,等待前面的循环调用
直到该队列没有任务了,会通过key删除这个队列
看个简单的demo
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| class Queue { private pending: boolean = false public list: any[] = []
setPending(val) { this.pending = val }
register(fn) { return new Promise(async (resolve, reject) => { this.list.push([fn, resolve, reject]) if (!this.pending) await this.execute() }) }
async execute() { this.setPending(true) const [fn, resolve, reject] = this.list.pop() await fn(resolve, reject) this.setPending(false) if (this.list.length) await this.execute() } }
|
1 2 3 4 5 6 7 8 9 10 11
| const queue = new Queue() router.post('/xxx', async ctx => { const result = await queue.register(async (resolve, reject) => { await sleep(2000) resolve() }) ctx.status = 200 ctx.body = { data: result, } })
|
这种加锁的方式优点成本比较低,对于并发不高的业务可以使用,但是缺点也很明显
- 任务都是串行的,并发多了会造成阻塞
- 对于集群服务而言内存是不共享的,多台机器就不能这么玩了
- 引入redis锁
redis加分布式锁方法是通过原子操作实现的,众所周知原子操作是不可分割的,在执行完毕之前不会被任何其它任务或事件中断。
所以我们在请求A打进来的时候对库存操作的加锁,这样请求B进来会先去读锁是否被释放,等锁释放了再去操作库存,这样就能够避免读写不一样的问题
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| class RedisLock { private expireTime: number private lockTimeout: number private expireUnit: 'PX' | 'EX' private expireMode: 'NX' | 'XX' private client: any
constructor(client, options: LockOption = {}) { if (!client) throw new Error('缺少redis客户端')
this.expireTime = options.expireTime || 2 this.lockTimeout = options.lockTimeout || 5 this.expireUnit = options.expireUnit || 'EX' this.expireMode = options.expireMode || 'NX' this.client = client }
async lock(key, val, expire?) { const self = this
return (async function retry() { try { const result = await self.client.set(key, val, self.expireUnit, expire || self.expireTime, self.expireMode) if (result === 'OK') { return true }
await new Promise(resolve => setTimeout(resolve, 200))
return retry() } catch (e) { console.log(e) } })() }
async unlock(key, val) { const script = "if redis.call('get',KEYS[1]) == ARGV[1] then" + " return redis.call('del',KEYS[1]) " + "else" + " return 0 " + "end"
try { const result = await this.client.eval(script, 1, key, val) return result === 1 } catch (e) { console.log(e) return false } } }
|
上面的代码加锁的时候使用了’NX’,保证锁存在时再有进程进行加锁会执行失败,且解锁过程是需要执行lua脚本进行原子操作
如何使用
1 2 3 4 5 6
| const lock = new RedisLock(redis) router.post('/xxx', async ctx => { await lock.lock('key', 'val') await sleep(xxx) await lock.unlock('key', 'val') })
|
使用redis分布式锁的好处是redis读取速度快且能在集群中使用,当然并发量高了也能在使用redis加锁的同时做一层redis缓存提升服务性能。
乐观锁
乐观锁是一种极其乐观的加锁方式,它认为不会出现资源抢占的情况,常见的策略比如CAS(Compare and Swap),在执行写操作前我们记录数据的初始值和预期值,
更新时check一下初始值和数据库当前的值 如果一样就将预期值更新进去,否则就认为是过期请求,再次尝试。
乐观锁的使用成本比较高,因为会出现ABA的情况,库存可能被请求B从3修改成2,又从2修改成3,这样A的原始值和预期值是一样的,就需要在表里增加version标记每次更新的版本。