分布式锁

2022/07/27 SystemDesign 共 3126 字,约 9 分钟

解决多实例执行任务时的并发控制

背景

进程内锁,synchronized、lock对象,无法解决进程间、实例间的并发控制

方案

基于DB

  • 锁表,基于唯一键

加锁:INSERT INTO database_lock(resource, description) VALUES (1, 'lock');

解锁:DELETE FROM database_lock WHERE resource=1;

简单但不可靠,没有锁清理机制

  • 悲观锁,基于事务,先起事务获取锁,执行动作,最后事务提交自动释放锁

Mysql:select…for update

基于事务,字段必须走索引才会用到行锁,否则会锁表,影响其他并发逻辑

  • 乐观锁,使用字段版本号和状态

加锁:获取当前值并记录版本号,检查状态,更新版本号+1和状态ing,条件是原版本号没变

解锁:更新状态完成即可

增加版本号字段,加锁时需要考虑锁超时,解锁时需要避免影响其他并发逻辑

无法解决ABA问题

基于中间件

  • Redis
  • ZK

基于唯一key抢占逻辑,附带超时机制

应用

基于DB,乐观锁

表结构

  1. 任务名字task_name做唯一索引
  2. 乐观锁,增加版本号version字段
  3. task_status记录任务状态,用来检查任务冲突
  4. update_time更新时间,用来检查锁超时
CREATE TABLE IF NOT EXISTS `distribute_task_lock`  (
  `task_name` varchar(512) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
  `task_status` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
  `version` bigint(20) NOT NULL,
  `update_time` bigint(20) NOT NULL,
  `start_time` bigint(20) NOT NULL,
  `finish_time` bigint(20) NULL DEFAULT NULL,
  PRIMARY KEY (`task_name`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

获取锁

    public Long lock(String taskName, long taskTimeout) {
......
// 1. 获取锁对象,无锁时自带创建锁内容
        DistributeTaskLock distTask = getOrCreateTask(taskName);
        if (distTask == null) {
            log.error("can not found task {} from DB", taskName);
            return null;
        }
        try {
            // 2. 超时判断,条件:运行中&未超时
            long currentTime = System.currentTimeMillis();
            if (DistributedTaskStatusEnum.RUNNING.name().equals(distTask.getTaskStatus())) {
                if ((currentTime - distTask.getUpdateTime()) < taskTimeout) {
                    log.info("task {} is in running status, the task may has been started in other node.", taskName);
                    return null;
                } else {
                    log.warn("task {} timeout, continue to lock", taskName);
                }
            }
            // 3. 获取成功时,基于原版本号更新版本号自加1,状态改为运行中
            if (updateVersion(distTask, currentTime)) {
                // successfully lock
                log.info("task {} lock, ready to run.", distTask.getTaskName());
                return distTask.getVersion() + 1;
            }
            log.info("task {} lock failed, may has been started at other node.", distTask.getTaskName());
        } catch (Exception e) {
            log.error("get lock for task {} failed: {}", taskName, e);
        }
        return null;
    }

返回锁版本号

释放锁

    public void unlock(String taskName, Long version) {
......
        // 1. 获取锁信息
        DistributeTaskLock distTask = getOrCreateTask(taskName);
        if (distTask == null) {
            log.warn("can not found task {} from DB", taskName);
            return;
        }

        try {
            // 2. 更新锁状态,版本号自加1,状态改为完成
            // version cycle
            Long ver = version > Long.MAX_VALUE - 100 ? 0L : version;
            LambdaUpdateWrapper<DistributeTaskLock> updateWrapper = new LambdaUpdateWrapper<>();
            updateWrapper.eq(DistributeTaskLock::getTaskName, taskName)
                .eq(DistributeTaskLock::getVersion, version)
                .set(DistributeTaskLock::getVersion, ver)
                .set(DistributeTaskLock::getTaskStatus, DistributedTaskStatusEnum.FINISH.name())
                .set(DistributeTaskLock::getFinishTime, System.currentTimeMillis());
            distributedTaskLockService.update(updateWrapper);

            log.info("task {} finished, lock released", taskName);
        } catch (Exception e) {
            log.error("release task {} failed: {}", taskName, e);
        }
    }

使用

// 1. 获取分布式锁,支持设置超时时间参数
        Long version = distributedTaskLockUtil.lock(taskName);

// 2. 判断锁是否获取成功
        if (ObjectUtils.isEmpty(version)) {
            return AppResponse.buildFailed(AppExceptionEnum.FAILED,
                AppExceptionEnum.FAILED.getCode(),
                "task {} is in running status, the task may has been started in other node.", taskName);
        }
        try {
            // 3. 执行业务逻辑
            ......
        } finally {
            // 4. 释放锁
            distributedTaskLockUtil.unlock(taskName, version);
        }
  1. 任务名taskName全局唯一
  2. 获取锁,可设置超时时间,默认超时600s,超时后可被其他地方锁定
  3. 释放锁,仅释放获取到的锁,不影响其他地方获取的锁,必须放到finally块中

Search

    Table of Contents