上一篇 从0开始的高并发(一)--- zookeeper的基础概念 咱们在结尾留下了一个分布式锁的坑,它保证了咱们在多节点应用的一次调度还有解决分布式环境下的数据一致性的问题html
好比咱们如今拥有这么一个集群,集群里面有个缓存服务,集群中每一个程序都会用到这个缓存,若是此时缓存中有一项缓存过时了,在大并发环境下,同一时刻中许许多多的服务都过来访问缓存,获取缓存中的数据,发现缓存过时,就要再去数据库取,而后更新到缓存服务中去。可是其实咱们仅仅只须要一个请求过来数据库去更新缓存便可,而后这个场景,咱们该怎么去作java
咱们参考多线程的场景下会使用到锁的这个方法,放到如今的并发场景下,咱们也是须要经过一种锁来实现。node
排他(互斥)性:只有一个线程能获取到
文件系统(同一个文件不支持多我的去修改)
数据库:主键惟一约束 for update
缓存:redis setnx命令
zookeeper:相似文件系统
阻塞性:其余未抢到的线程阻塞,直到锁被释放再进行抢这个行为
可重入性:线程获取锁后,后续是否可重复得到该锁
复制代码
同一个父目录下面不能有相同的子节点,这就是zookeeper的排他性
经过JDK的栅栏来实现阻塞性
可重入性咱们能够经过计数器来实现
复制代码
1.接口难以使用
2.链接zookeeper超时不支持自动重连
3.watch注册一次会失效,须要反复注册
4.不支持递归建立节点(递归建立的话,比方说我要建立一个文件,假如我在idea建立,那我能够连带着包一块儿建立,可是在window我就作不到,这种整一个路径一并建立下来的就能够视为递归建立)
5.须要手动设置序列化的问题
复制代码
org.apache.zookeeper
org.apache.zookeeper.data
connect---链接到zookeeper集合
create---建立znode
exist---检查znode是否存在及其信息
getData---从特定的znode获取数据
setData---从特定的znode设置数据
getChildren---获取特定znode中的全部子节点
delete===删除特定znode及其全部子项
close---关闭链接
复制代码
MyZkSerializer.javaredis
public class MyZkSerializer implements ZkSerializer {
//正常来讲咱们还须要进行一个非空判断,这里为了省事没作,不过严格来讲是须要作的
//就是简单的转换
@Override
public byte[] serialize(Object data) throws ZkMarshallingError {
String d = (String) data;
try {
return d.getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return null;
}
@Override
public Object deserialize(byte[] bytes) throws ZkMarshallingError {
try {
return new String(bytes, "UTF-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return null;
}
}
复制代码
ZkClientDemo.java数据库
public class ZkClientDemo {
public static void main(String[] args) {
// 建立一个zk客户端
ZkClient client = new ZkClient("localhost:2181");
//实现序列化接口
client.setZkSerializer(new MyZkSerializer());
//建立一个节点zk,在zk节点下再建立一个子节点app6,赋值123
//在以前也已经提到了,zookeeper中的节点既是文件夹也是文件
//源码中CreateMode是一个枚举,CreateMode.PERSISTENT---当客户端断开链接时,znode不会自动删除
client.create("/zk/app6", "123", CreateMode.PERSISTENT);
client.subscribeChildChanges("/zk/app6", new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
System.out.println(parentPath+"子节点发生变化:"+currentChilds);
}
});
//这里开始是建立一个watch,可是为何这个方法会命名为subscribeDataChanges()呢,缘由是:
//本来watch的设置而后获取是仅一次性的,如今咱们使用subscribe这个英文,表明订阅,表明这个watch一直存在
//使用这个方法咱们能够轻易实现持续监听的效果,比原生zookeeper方便
client.subscribeDataChanges("/zk/app6", new IZkDataListener() {
@Override
public void handleDataDeleted(String dataPath) throws Exception {
System.out.println(dataPath+"节点被删除");
}
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
System.out.println(dataPath+"发生变化:"+data);
}
});
try {
Thread.currentThread().join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
复制代码
调用ls /zk---能够发现app6已经被建立,apache
经过get /zk/app6---可获取到咱们设置的123这个值缓存
说明咱们的程序没有问题,能够成功执行网络
这里测试监听事件多线程
create /zk/app6/tellYourDream时---控制台打印/zk/app6子节点发生变化:[tellYourDream]并发
delete /zk/app6/tellYourDream---控制台打印/zk/app6子节点发生变化:[],此时已经不存在任何节点,因此为空
set /zk/app6 123456---/zk/app6发生变化:123456
delete /zk/app6---同时触发了两个监听事件,/zk/app6子节点发生变化:null 和 /zk/app6节点被删除
1.持久化节点:不删除节点永远存在。且能够建立子节点
/**
* The znode will not be automatically deleted upon client's disconnect.
* 持久无序
*/
PERSISTENT (0, false, false),
/**
* The znode will not be automatically deleted upon client's disconnect,
* and its name will be appended with a monotonically increasing number.
* 持久有序
*/
PERSISTENT_SEQUENTIAL (2, false, true),
复制代码
2.非持久节点,换言之就是临时节点,临时节点就是客户端链接的时候建立,客户端挂起的时候,临时节点自动删除。不能建立子节点
/**
* The znode will be deleted upon the client's disconnect.
* 临时无序
*/
EPHEMERAL (1, true, false),
/**
* The znode will be deleted upon the client's disconnect, and its name
* will be appended with a monotonically increasing number.
* 临时有序
*/
EPHEMERAL_SEQUENTIAL (3, true, true);
复制代码
还有更多的一些监听方法,咱们能够本身去尝试一下。
咱们以前有提到,zookeeper中同一个子节点下面的节点名称是不能相同的,咱们能够利用这个互斥性,就能够实现分布式锁的工具
临时节点就是建立的时候存在,消失的时候,节点自动删除,当客户端失联,网络不稳定或者崩溃的时候,这个经过临时节点所建立的锁就会自行消除。这样就能够完美避免死锁的问题。因此咱们利用这个特性,实现咱们的需求。
原理其实就是节点不可重名+watch机制。
好比说咱们的程序有多个服务实例,哪一个服务实例都去建立一个lock节点,谁建立了,谁就得到了锁,剩下咱们没有建立的应用,就去监听这个lock节点,若是这个lock节点被删除掉,这时可能出现两种状况,一就是客户端连不上了,另外一种就是客户端释放锁,将lock节点给删除掉了。
public class ZkDistributeLock implements Lock {
//咱们须要一个锁的目录
private String lockPath;
//咱们须要一个客户端
private ZkClient client;
//刚刚咱们的客户端和锁的目录,这两个参数怎么传进来?
//那就须要咱们的构造函数来进行传值
public ZkDistributeLock(String lockPath) {
if(lockPath ==null || lockPath.trim().equals("")) {
throw new IllegalArgumentException("patch不能为空字符串");
}
this.lockPath = lockPath;
client = new ZkClient("localhost:2181");
client.setZkSerializer(new MyZkSerializer());
}
复制代码
实现Lock接口要重写的方法(包括尝试建立临时节点tryLock(),解锁unlock(),上锁lock(),waitForLock()实现阻塞和唤醒的功能方法)
// trylock方法咱们是会尝试建立一个临时节点
@Override
public boolean tryLock() { // 不会阻塞
// 建立节点
try {
client.createEphemeral(lockPath);
} catch (ZkNodeExistsException e) {
return false;
}
return true;
}
@Override
public void unlock() {
client.delete(lockPath);
}
@Override
public void lock() {
// 若是获取不到锁,阻塞等待
if (!tryLock()) {
// 没得到锁,阻塞本身
waitForLock();
// 从等待中唤醒,再次尝试得到锁
lock();
}
}
private void waitForLock() {
final CountDownLatch cdl = new CountDownLatch(1);
IZkDataListener listener = new IZkDataListener() {
@Override
public void handleDataDeleted(String dataPath) throws Exception {
System.out.println("----收到节点被删除了-------------");
//唤醒阻塞线程
cdl.countDown();
}
@Override
public void handleDataChange(String dataPath, Object data)
throws Exception {
}
};
client.subscribeDataChanges(lockPath, listener);
// 阻塞本身
if (this.client.exists(lockPath)) {
try {
cdl.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 取消注册
client.unsubscribeDataChanges(lockPath, listener);
}
}
复制代码
ZkDistributeLock 如今咱们再总结一下流程
获取锁,建立节点后
1.成功获取到的---执行业务---而后释放锁
|
|
|
2.获取失败,注册节点的watch---阻塞等待---取消watch---再回到获取锁,建立节点的判断
复制代码
这个设计会有一个缺点,好比个人实例如今有无数个,此时咱们的lock每次被建立,有人获取了锁以后,其余的人都要被通知阻塞,此时咱们就浪费了不少的网络资源,也就是惊群效应。
此时咱们必须进行优化
咱们的Lock做为一个znode,也能够建立属于它的子节点,咱们使用lock建立临时顺序节点,咱们在从0开始的高并发(一)--- zookeeper的基础概念中已经提到了,zookeeper是有序的,临时顺序节点会自动进行由小到大的自动排序,此时咱们把实例分配至这些顺序子节点上,而后编号最小的获取锁便可。这很是相似于咱们的公平锁的概念,也是遵循FIFO原则的
原理:取号 + 最小号取lock + watch
一样是基于Lock接口的实现
public class ZkDistributeImproveLock implements Lock {
/*
* 利用临时顺序节点来实现分布式锁
* 获取锁:取排队号(建立本身的临时顺序节点),而后判断本身是不是最小号,如是,则得到锁;不是,则注册前一节点的watcher,阻塞等待
* 释放锁:删除本身建立的临时顺序节点
*/
//一样的锁目录
private String lockPath;
//一样的客户端
private ZkClient client;
private ThreadLocal<String> currentPath = new ThreadLocal<String>();
private ThreadLocal<String> beforePath = new ThreadLocal<String>();
// 锁重入计数器
private ThreadLocal<Integer> reenterCount = ThreadLocal.withInitial(()->0);
public ZkDistributeImproveLock(String lockPath) {
if(lockPath == null || lockPath.trim().equals("")) {
throw new IllegalArgumentException("patch不能为空字符串");
}
this.lockPath = lockPath;
client = new ZkClient("localhost:2181");
client.setZkSerializer(new MyZkSerializer());
if (!this.client.exists(lockPath)) {
try {
this.client.createPersistent(lockPath, true);
} catch (ZkNodeExistsException e) {
}
}
}
@Override
public boolean tryLock() {
System.out.println(Thread.currentThread().getName() + "-----尝试获取分布式锁");
if (this.currentPath.get() == null || !client.exists(this.currentPath.get())) {
//这里就是先去建立了一个临时顺序节点,在lockpath那里建立
//用银行取号来表示这个行为吧,至关于每一个实例程序先去取号,而后排队等着叫号的场景
String node = this.client.createEphemeralSequential(lockPath + "/", "locked");
//记录第一个节点编号
currentPath.set(node);
reenterCount.set(0);
}
// 得到全部的号
List<String> children = this.client.getChildren(lockPath);
// 把这些号进行排序
Collections.sort(children);
// 判断当前节点是不是最小的,和第一个节点编号作对比
if (currentPath.get().equals(lockPath + "/" + children.get(0))) {
// 锁重入计数
reenterCount.set(reenterCount.get() + 1);
System.out.println(Thread.currentThread().getName() + "-----得到分布式锁");
return true;
} else {
// 取到前一个
// 获得字节的索引号
int curIndex = children.indexOf(currentPath.get().substring(lockPath.length() + 1));
String node = lockPath + "/" + children.get(curIndex - 1);
beforePath.set(node);
}
return false;
}
@Override
public void lock() {
if (!tryLock()) {
// 阻塞等待
waitForLock();
// 再次尝试加锁
lock();
}
}
private void waitForLock() {
final CountDownLatch cdl = new CountDownLatch(1);
// 注册watcher
IZkDataListener listener = new IZkDataListener() {
@Override
public void handleDataDeleted(String dataPath) throws Exception {
System.out.println(Thread.currentThread().getName() + "-----监听到节点被删除,分布式锁被释放");
cdl.countDown();
}
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
}
};
client.subscribeDataChanges(this.beforePath.get(), listener);
// 怎么让本身阻塞
if (this.client.exists(this.beforePath.get())) {
try {
System.out.println(Thread.currentThread().getName() + "-----分布式锁没抢到,进入阻塞状态");
cdl.await();
System.out.println(Thread.currentThread().getName() + "-----释放分布式锁,被唤醒");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 醒来后,取消watcher
client.unsubscribeDataChanges(this.beforePath.get(), listener);
}
@Override
public void unlock() {
System.out.println(Thread.currentThread().getName() + "-----释放分布式锁");
if(reenterCount.get() > 1) {
// 重入次数减1,释放锁
reenterCount.set(reenterCount.get() - 1);
return;
}
// 删除节点
if(this.currentPath.get() != null) {
this.client.delete(this.currentPath.get());
this.currentPath.set(null);
this.reenterCount.set(0);
}
}
复制代码
ps:不用担忧内存占满的问题,JVM会进行垃圾回收
这里对于curator就不作展开了,有兴趣能够本身去玩下
地址:curator.apache.org/curator-exa…
对于选举leader,锁locking,增删改查的framework等都有实现
距离上一篇的更新彷佛隔了好一段1时间,也是由于上周比较忙抽不出空子来,以后仍是会进行周更(尽力)
下一篇:从零开始的高并发(三)--- Zookeeper集群的leader选举