解决多实例执行任务时的并发控制
背景
进程内锁,synchronized、lock对象,无法解决进程间、实例间的并发控制
方案
基于DB
- 锁表,基于唯一键
加锁:INSERT INTO database_lock(resource, description) VALUES (1, 'lock');
解锁:DELETE FROM database_lock WHERE resource=1;
简单但不可靠,没有锁清理机制
- 悲观锁,基于事务,先起事务获取锁,执行动作,最后事务提交自动释放锁
Mysql:select…for update
基于事务,字段必须走索引才会用到行锁,否则会锁表,影响其他并发逻辑
- 乐观锁,使用字段版本号和状态
加锁:获取当前值并记录版本号,检查状态,更新版本号+1和状态ing,条件是原版本号没变
解锁:更新状态完成即可
增加版本号字段,加锁时需要考虑锁超时,解锁时需要避免影响其他并发逻辑
无法解决ABA问题
基于中间件
- Redis
- ZK
基于唯一key抢占逻辑,附带超时机制
应用
基于DB,乐观锁
表结构
- 任务名字task_name做唯一索引
- 乐观锁,增加版本号version字段
- task_status记录任务状态,用来检查任务冲突
- update_time更新时间,用来检查锁超时
CREATE TABLE IF NOT EXISTS `distribute_task_lock` (
`task_name` varchar(512) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
`task_status` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
`version` bigint(20) NOT NULL,
`update_time` bigint(20) NOT NULL,
`start_time` bigint(20) NOT NULL,
`finish_time` bigint(20) NULL DEFAULT NULL,
PRIMARY KEY (`task_name`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
获取锁
public Long lock(String taskName, long taskTimeout) {
......
// 1. 获取锁对象,无锁时自带创建锁内容
DistributeTaskLock distTask = getOrCreateTask(taskName);
if (distTask == null) {
log.error("can not found task {} from DB", taskName);
return null;
}
try {
// 2. 超时判断,条件:运行中&未超时
long currentTime = System.currentTimeMillis();
if (DistributedTaskStatusEnum.RUNNING.name().equals(distTask.getTaskStatus())) {
if ((currentTime - distTask.getUpdateTime()) < taskTimeout) {
log.info("task {} is in running status, the task may has been started in other node.", taskName);
return null;
} else {
log.warn("task {} timeout, continue to lock", taskName);
}
}
// 3. 获取成功时,基于原版本号更新版本号自加1,状态改为运行中
if (updateVersion(distTask, currentTime)) {
// successfully lock
log.info("task {} lock, ready to run.", distTask.getTaskName());
return distTask.getVersion() + 1;
}
log.info("task {} lock failed, may has been started at other node.", distTask.getTaskName());
} catch (Exception e) {
log.error("get lock for task {} failed: {}", taskName, e);
}
return null;
}
返回锁版本号
释放锁
public void unlock(String taskName, Long version) {
......
// 1. 获取锁信息
DistributeTaskLock distTask = getOrCreateTask(taskName);
if (distTask == null) {
log.warn("can not found task {} from DB", taskName);
return;
}
try {
// 2. 更新锁状态,版本号自加1,状态改为完成
// version cycle
Long ver = version > Long.MAX_VALUE - 100 ? 0L : version;
LambdaUpdateWrapper<DistributeTaskLock> updateWrapper = new LambdaUpdateWrapper<>();
updateWrapper.eq(DistributeTaskLock::getTaskName, taskName)
.eq(DistributeTaskLock::getVersion, version)
.set(DistributeTaskLock::getVersion, ver)
.set(DistributeTaskLock::getTaskStatus, DistributedTaskStatusEnum.FINISH.name())
.set(DistributeTaskLock::getFinishTime, System.currentTimeMillis());
distributedTaskLockService.update(updateWrapper);
log.info("task {} finished, lock released", taskName);
} catch (Exception e) {
log.error("release task {} failed: {}", taskName, e);
}
}
使用
// 1. 获取分布式锁,支持设置超时时间参数
Long version = distributedTaskLockUtil.lock(taskName);
// 2. 判断锁是否获取成功
if (ObjectUtils.isEmpty(version)) {
return AppResponse.buildFailed(AppExceptionEnum.FAILED,
AppExceptionEnum.FAILED.getCode(),
"task {} is in running status, the task may has been started in other node.", taskName);
}
try {
// 3. 执行业务逻辑
......
} finally {
// 4. 释放锁
distributedTaskLockUtil.unlock(taskName, version);
}
- 任务名taskName全局唯一
- 获取锁,可设置超时时间,默认超时600s,超时后可被其他地方锁定
- 释放锁,仅释放获取到的锁,不影响其他地方获取的锁,必须放到finally块中