Welcome everyone

史上最全分布式锁调研

分布式 汪明鑫 368浏览 2评论

单机多线程竞争共享资源

在单机情况下解决共享资源的竞争问题,用jdk自带的类即可

 

 

但是在分布式环境下就没这么简单了

 

什么是分布式

单机应用已经不能满足需求,服务部署到不同的机器

搭建高可用、可扩展的分布式集群

 

到了分布式系统的时代,线程之间的锁机制就没作用了,系统有多份并且部署在不同的机器上,这些资源已经不是在线程之间共享了,而是属于进程之间共享的资源。

分布式的场景下就会出现一些问题,比如session共享,分布式事务,分布式下操作共享资源

随着分布式的出现和流行,一些理论、技术、中间件应运而生

 

 

为什么需要分布式锁

传统的单机应用上锁已经不能解决分布式下对共享资源的竞争

分布式锁解决分布式环境下对共享资源的竞争
分布式锁是在分布式环境下,保护跨进程、跨主机、跨网络的共享资源,实现互斥访问,保证一致性

 

 

引入分布式锁后:

 

 

 

分布式锁应用场景

分布式环境下会出现资源竞争的地方都需要分布式锁

12306 抢火车票的高并发场景就需要锁机制,还有类似共享文档平台比如wiki编辑问题

在秒杀场景下,单机应用架构中,秒杀案例使用ReentrantLcok或者synchronized来达到秒杀商品互斥的目的。然而在分布式系统中,会存在多台机器并行去实现同一个功能。也就是说,在多进程中,如果还使用以上JDK提供的进程锁,来并发访问数据库资源就可能会出现商品超卖的情况,这时候就可以使用分布式锁。

 

2个人都想操作wiki里的一篇文档

逻辑代码

 //根据docid获取文件内容,从分布式文件系统取,时间不可控
    nowFileContent = getFileByDocId(docId)    //do something,类似diff,追加操作
    newFileContent = doSomeThing()    //存储到文件系统
    setNewFileContent(docId,newFileContent)

 

A、C 两个请求同时到达代码段,但是由于网络原因,A 先拿到文档内容,C 在 A 写入前读到文件内容,所以最终的结果是两者会丢失一个写入。

所以需要对读写操作做一次加锁,保证事务的完整、一致。

希望可以达到下面的效果

Wiki 这类场景属于长耗时事务的资源处理问题,锁的出现保证不会因为事务中的读写间跨度耗时大导致写覆盖的情况,使得请求排队,顺序处理。

 

 

分布式锁的实现方式

  • 数据库 (悲观锁、乐观锁)
  • 基于zookeeper
  • 基于redis

 

数据库的实现方式相对简单,zk的实现方式可靠性更高,但需要搭建zk集群,熟悉zk api,学习成本高,

redis基于内存,效率高。

对于不同方式实现的分布式锁之间的比较,因为水平有限也接触的很少不好多说,但是没有最好的,只有更适合的

技术选型可以根据部门和业务来定。

 

 

数据库的方式实现分布式锁

 

乐观锁

乐观锁机制其实就是在数据库表中引入一个版本号(version)字段来实现的。
当我们要从数据库中读取数据的时候,同时把这个version字段也读出来,如果要对读出来的数据进行更新后写回数据库,则需要将version加1,同时将新的数据与新的version更新到数据表中,且必须在更新的时候同时检查目前数据库里version值是不是之前的那个version,如果是,则正常更新。如果不是,则更新失败,说明在这个过程中有其它的进程去更新过数据了。

有点类似java的CAS

如图,假设同一个账户,用户A和用户B都要去进行取款操作,账户的原始余额是2000,用户A要去取1500,用户B要去取1000,如果没有锁机制的话,在并发的情况下,可能会出现余额同时被扣1500和1000,导致最终余额的不正确甚至是负数。但如果这里用到乐观锁机制,当两个用户去数据库中读取余额的时候,除了读取到2000余额以外,还读取了当前的版本号version=1,等用户A或用户B去修改数据库余额的时候,无论谁先操作,都会将版本号加1,即version=2,那么另外一个用户去更新的时候就发现版本号不对,已经变成2了,不是当初读出来时候的1,那么本次更新失败,就得重新去读取最新的数据库余额。
通过上面这个例子可以看出来,使用「乐观锁」机制,必须得满足:
(1)锁服务要有递增的版本号version
(2)每次更新数据的时候都必须先判断版本号对不对(关键点),然后再写入新的版本号

除了用version版本号,也可以用时间戳

 

悲观锁

首先我们需要一个数据库的表

CREATE TABLE `methodLock` (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',
  `method_name` varchar(64) NOT NULL DEFAULT '' COMMENT '锁定的方法名',
  `desc` varchar(1024) NOT NULL DEFAULT '备注信息',
  `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '保存数据时间,自动生成',
  PRIMARY KEY (`id`),
  UNIQUE KEY `uidx_method_name` (`method_name `) USING BTREE
)
 ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='锁定中的方法';

 

基于MySQL的InnoDB引擎   select  …. for update

public boolean lock(){
    connection.setAutoCommit(false)
    while(true){
        try{
            result = select * from methodLock where method_name=xxx for update;
            if(result有值){
                return true;   
            }
        }catch(Exception e){

        }
        sleep(1000);  //沉睡一会再去尝试获取排他锁
    }
    return false;
}

public boolean unlock(){
    connection.commit();  
}

只有一个线程能成功,其他线程无法再在该行记录上增加排他锁,直到select for  ...  update成功的线程使用commit操作后。

数据库会在查询过程中给数据库表增加排他锁

InnoDB引擎在加锁的时候,只有通过索引进行检索的时候才会使用行级锁,否则会使用表级锁。这里我们希望使用行级锁,就要给method_name添加索引。

 

可以参考:https://blog.csdn.net/claram/article/details/54023216

 

 

 

还有一种方法,利用method_name的唯一性

可以使用insert语句来表示是否获取锁,delete语句表示是否释放锁

insert成功则表示获取锁成功,失败表示已经该方法已经被锁住

因为我们对method_name做了唯一性约束,这里如果有多个请求同时提交到数据库的话,数据库会保证只有一个操作可以成功,那么我们可以认为操作成功的那个线程获得了该方法的锁

 

当我们要锁住某个方法时,执行以下SQL:

insert into methodLock(method_name,desc) values (‘method_name’,‘desc’)

 

当方法执行完毕之后,想要释放锁的话,需要执行以下sql:

delete from methodLock where method_name ='method_name'

 

上面这种实现有以下几个问题:

1、这把锁依赖数据库的可用性,数据库是一个单点,一旦数据库挂掉,会导致业务系统不可用。
2、这把锁没有失效时间,一旦解决操作失败,就会导致记录一直在数据库中,其他线程无法在获得锁。
3、这把锁只能是非阻塞的,因为数据的insert操作,一旦插入失败就会直接报错。没有获得锁的线程并不会进入排队队列,要想再次获得锁就要再次触发获得锁的操作。
4、这把锁是非重入的,同一个线程在没有释放锁之前无法再次获得该锁。因为数据库表中数据已经存在了。

当然,我们也可以有其它方式解决上面的问题:

1、数据库是单点?搞mysql集群
2、没有失效时间?可以单独写一个定时任务,每隔一定时间把数据库中的超时数据清理一遍。
3、非阻塞?可以写一个while循环,直到insert成功再返回成功。 还可以设置一个重试时间,超过重试时间就不再尝试insert
4、非重入?可重入主要实现思路是,在每次获取锁之前去取当前锁的信息,如果锁的线程是当前线程,那么更新锁的count+1,并且执行锁之后的逻辑。如果不是当前锁,那么进行重试。释放的时候也要进行count-1,最后减到0时,删除锁标识释放锁。

 

 

基于zookeeper实现分布式锁

zk集群是一个cp系统

利用他的有序临时节点以及watcher机制可以实现分布式锁

 

/lock是我们用于加锁的目录,/resource_name是我们锁定的资源,其下面的节点按照我们加锁的顺序排列。

 

具体实现分析:

在zookeeper中创建一个根节点(Locks),用于后续各个客户端的锁操作

想要获取锁的client都在Locks路径下创建一个临时有序节点,

每个client得到一个序号,相当于排队

如果自己的序号是最小的,就成功获得锁

如果不是最小的,就监听自己前面的那个序号

前面的那个序号节点删除后就会触发一个通知

告诉轮到自己获取锁了,因为比我小的都被删了,

我就算本轮序号最小的节点,

毕竟风水轮流转

 

 

 

 

释放锁的过程 将自己对应的节点删除即可,下一个排队的节点就可以收到通知,从而被唤醒得到锁

例如client A需要释放锁,只需要把对应的节点1删除掉,因为client B已经关注了节点1,那么当节点1被删除后,zookeeper就会通知client B,B对应的序号就是最小的了,就表示获取锁了

 

部分代码实现(关键思想是这样的,但写法很多)

public boolean tryLock() {
        try {
            String splitStr = "_lock_";
            //创建临时有序子节点
            myZnode = zk.create(root + "/" + lockName + splitStr, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
            System.out.println(myZnode + " is created ");
            //取出所有子节点
            List<String> subNodes = zk.getChildren(root, false);
            //取出所有lockName的锁
            List<String> lockObjNodes = new ArrayList<String>();
            for (String node : subNodes) {
                String _node = node.split(splitStr)[0];
                if(_node.equals(lockName)){
                    lockObjNodes.add(node);
                }
            }
            Collections.sort(lockObjNodes);  //排序
            System.out.println(myZnode + "==" + lockObjNodes.get(0));
            if(myZnode.equals(root+"/"+lockObjNodes.get(0))){
                //如果是最小的节点,则表示取得锁
                return true;
            }
            //如果不是最小的节点,找到比自己小1的节点
            String subMyZnode = myZnode.substring(myZnode.lastIndexOf("/") + 1);
            waitNode = lockObjNodes.get(Collections.binarySearch(lockObjNodes, subMyZnode) - 1);
        } catch (KeeperException e) {
            throw new LockException(e);
        } catch (InterruptedException e) {
            throw new LockException(e);
        }
        return false;
    }

详细代码可见  https://blog.csdn.net/peace1213/article/details/52571445

 

其实可以直接使用Curator封装的zookeeper分布式锁实现

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.0.0</version>
</dependency>
public static void main(String[] args) throws Exception {
    //创建zookeeper的客户端
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
    CuratorFramework client = CuratorFrameworkFactory.newClient("ip1:port,ip2:port,ip3:port", retryPolicy);
    client.start();

    //创建分布式锁, 锁空间的根节点路径为/curator/lock
    InterProcessMutex mutex = new InterProcessMutex(client, "/curator/lock");
    mutex.acquire();
    //获得了锁, 进行业务流程
    System.out.println("Enter mutex");
    //完成业务流程, 释放锁
    mutex.release();
    
    //关闭客户端
    client.close();
}

核心操作就只有mutex.acquire()和mutex.release(),都封装好了

其他的操作对我们来说都是透明的

 

基于redis实现分布式锁

基于缓存来实现在性能方面会表现的更好一点。

 

使用命令介绍:
(1)SETNX
SETNX key val:当且仅当key不存在时,set一个key为val的字符串,返回1;若key存在,则什么都不做,返回0。
(2)expire
expire key timeout:为key设置一个超时时间,单位为second,超过这个时间锁会自动释放,避免死锁。
(3)delete
delete key:删除key
在使用Redis实现分布式锁的时候,主要就会使用到这三个命令。
实现思想:
(1)获取锁的时候,使用setnx加锁,并使用expire命令为锁添加一个超时时间,超过该时间则自动释放锁,锁的value值为一个随机生成的UUID,通过此在释放锁的时候进行判断。
(2)获取锁的时候还设置一个获取的超时时间,若超过这个时间则放弃获取锁。
(3)释放锁的时候,通过UUID判断是不是该锁,若是该锁,则执行delete进行锁释放。
/**
 * redis分布式锁的实现代码
 */
public class DistributedLock {

    private final JedisPool jedisPool;

    public DistributedLock(JedisPool jedisPool) {
        this.jedisPool = jedisPool;
    }

    /**
     * 加锁
     * @param lockName       锁的key
     * @param acquireTimeout 获取超时时间
     * @param timeout        锁的超时时间
     * @return 锁标识
     */
    public String lockWithTimeout(String lockName, long acquireTimeout, long timeout) {
        Jedis conn = null;
        String retIdentifier = null;
        try {
            // 获取连接
            conn = jedisPool.getResource();
            // 随机生成一个value
            String identifier = UUID.randomUUID().toString();
            // 锁名,即key值
            String lockKey = "lock:" + lockName;
            // 超时时间,上锁后超过此时间则自动释放锁
            int lockExpire = (int) (timeout / 1000);

            // 获取锁的超时时间,超过这个时间则放弃获取锁
            long end = System.currentTimeMillis() + acquireTimeout;
            while (System.currentTimeMillis() < end) {
                if (conn.setnx(lockKey, identifier) == 1) {
                    conn.expire(lockKey, lockExpire);
                    // 返回value值,用于释放锁时间确认
                    retIdentifier = identifier;
                    return retIdentifier;
                }
                // 返回-1代表key没有设置超时时间,为key设置一个超时时间
                if (conn.ttl(lockKey) == -1) {
                    conn.expire(lockKey, lockExpire);
                }

                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        } catch (JedisException e) {
            e.printStackTrace();
        } finally {
            if (conn != null) {
                conn.close();
            }
        }
        return retIdentifier;
    }

    /**
     * 释放锁
     * @param lockName   锁的key
     * @param identifier 释放锁的标识
     * @return
     */
    public boolean releaseLock(String lockName, String identifier) {
        Jedis conn = null;
        String lockKey = "lock:" + lockName;
        boolean retFlag = false;
        try {
            conn = jedisPool.getResource();
            while (true) {
                // 监视lock,准备开始事务
                conn.watch(lockKey);
                // 通过前面返回的value值判断是不是该锁,若是该锁,则删除,释放锁
                if (identifier.equals(conn.get(lockKey))) {
                    Transaction transaction = conn.multi();
                    transaction.del(lockKey);
                    List<Object> results = transaction.exec();
                    if (results == null) {
                        continue;
                    }
                    retFlag = true;
                }
                conn.unwatch();
                break;
            }
        } catch (JedisException e) {
            e.printStackTrace();
        } finally {
            if (conn != null) {
                conn.close();
            }
        }
        return retFlag;
    }
}

 

分布式锁的一些问题

client1获取了锁并且设置了锁的超时时间,但是client1之后出现了STW,这个STW时间比较长,导致分布式锁进行了释放,client2获取到了锁,这个时候client1恢复了锁,那么就会出现client1,2同时获取到锁,这个时候分布式锁不安全问题就出现了。这个其实不仅仅局限于RedLock,对于我们的ZK,Mysql一样的有同样的问题。
时钟发生跳跃:对于Redis服务器如果其时间发生了向跳跃,那么肯定会影响我们锁的过期时间,那么我们的锁过期时间就不是我们预期的了,也会出现client1和client2获取到同一把锁,那么也会出现不安全,这个对于Mysql也会出现。但是ZK由于没有设置过期时间,那么发生跳跃也不会受影响。
长时间的网络I/O:这个问题和我们的GC的STW很像,也就是我们这个获取了锁之后我们进行网络调用,其调用时间由可能比我们锁的过期时间都还长,那么也会出现不安全的问题,这个Mysql也会有,ZK也不会出现这个问题。

 

这一块之后再研究下

 

美团分布式锁简单介绍

我们小组做的有一个分布式锁 cerberus
可以切换不同的引擎zk,tair,squirrel

squirrel是美团基于redis的封装的组件

 

 

小结

1,分布式锁解决分布式环境下对共享资源的竞争
2,区分单机和分布式下对共享资源的竞争
3,分布式锁的应用场景
4,分布式锁可以由数据库、zookeeper、redis实现
5,三种实现方式在今后的学习工作中需要自己一一实现下
6,分析对比三种实现方式的优缺点
7,看Curator的分布式锁实现的源码
8,看美团分布式锁源码
9,实现功能比较完善的分布式锁

转载请注明:汪明鑫的个人博客 » 史上最全分布式锁调研

喜欢 (0)

说点什么

2 评论 在 "史上最全分布式锁调研"

提醒
avatar
排序:   最新 | 最旧 | 得票最多
dsfdasf
游客

大佬牛逼,无解

wpDiscuz