设为首页 收藏本站
查看: 638|回复: 0

[经验分享] 分布式助手Zookeeper(六)

[复制链接]

尚未签到

发表于 2017-4-19 08:47:31 | 显示全部楼层 |阅读模式
散仙,在关于zookeeper的前几篇文章中,除了记录了zookeeper的一些基础知识,也介绍了怎么使用zookeeper来完成,配置文件同步,和主从自动切换的功能,那么,本篇散仙将会介绍下如何使用,zookeeper来完成分布式锁的功能,其实本质上是与主从切换的实现代码是非常类似的,但是功能上强调的重点不一样。
至于,为什么需要分布式锁(公平锁)?为什么不使用JAVA 自带的锁的应用?
1,为什么需要分布式锁? 因为在分布式环境下,可能会出现一些事务,这时候我们除了可以在存储层的数据库进行控制,也可以在应用层控制,举个例子来讲,中国的飞机路线,我们都知道任何时候,都只能由一架飞机通过,而这个控制这个由谁通过,什么时候通过,是由一个信号控制台来决定的,分布式的环境下由于节点分散在各个地方,各个区域,所以控制起来比较麻烦,这时候我们就可以使用zookeeper来轻松的完成,分布式锁的功能。
2,为什么不使用JAVA自带的锁?JAVA JDK提供了公平锁,与非公平锁,但这种实现是基于同一个JVM来说的,如果同一台机器上,不同的JVM,则可以使用文件锁,来实现,但是这些并不是分布式的模式,虽然可以通过RMI的方式来实现,但比较繁琐。

使用zookeeper来完成分布式锁的步骤如下:
序号内容
1创建一个持久znode
2多个程序并发的去zk服务上,创建一个短暂有时序性的节点路径。
3各个节点监听,比它小的里面,最大的节点的动态。
4如果发现,比它小的里面,最大的节点发生锁释放或退出,就自动接替为独占锁
5没发生改变的节点,继续重复步骤,2,3,4

拓扑图如下所示:

DSC0000.jpg
注意上图中的master指的就是,获取锁的实例,这其实跟集群环境里只能有一个master的道理一样。
代码如下:

package com.test;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.data.Stat;
/***
* 基于zookeeper实现的
* 分布式公平锁
*
* @author qin dong liang
* QQ技术群交流:324714439
*
* */
public class Lock1  implements Watcher {

/**
* ZK实例
* */
private ZooKeeper zk;
/**原子计数锁,防止在zk没有连上前,执行CURD操作*/
private CountDownLatch down=new CountDownLatch(1);
public Lock1() {
// TODO Auto-generated constructor stub
}

public Lock1(String host)throws Exception {
this.zk=new ZooKeeper(host, 5000, new Watcher() {
@Override
public void process(WatchedEvent event) {
// TODO Auto-generated method stub
/**链接上zk服务,岂可取消阻塞计数**/
if(event.getState()==KeeperState.SyncConnected){
down.countDown();
}
}
});
}
/**
* 字符编码
*
* **/
private static final Charset CHARSET=StandardCharsets.UTF_8;
/***
*
* 此方法是写入数据
* 如果不存在此节点
* 就会新建,已存在就是
* 更新
*
* **/
public void write(String path,String value)throws Exception{
Stat stat=zk.exists(path, false);
if(stat==null){
zk.create(path, value.getBytes(CHARSET), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}else{
zk.setData(path, value.getBytes(CHARSET), -1);
}
}

/**
*
* 切换锁
*
* **/
public void check()throws Exception{
List<String> list=zk.getChildren("/a", null);
Collections.sort(list);//排序使得节点有次序
if(list.isEmpty()){
System.out.println("此父路径下面没有节点,分布式锁任务完成或还没启动!");
}else{
String start=list.get(0);//获取第一个节点
String data=new String(zk.getData("/a/"+start, false,null));
if(data.equals("a")){//等于本身就启动作为Master
if(list.size()==1){
startMaster();//作为Master启动
}else{
automicSwitch();//对于非第一个启动的节点,会调用此方法,因为他的第一个挂了
//或释放锁了,所以它是抢占的
}
}else{
//非当前节点,就打印当前节点,监控的节点
for(int i=0;i<list.size();i++){
//获取那个节点存的此客户端的模拟IP
String temp=new String(zk.getData("/a/"+list.get(i), false, null));
if(temp.equals("a")){
//因为前面作为首位判断,所以这个出现的位置不可能是首位
//需要监听小节点里面的最大的一个节点
String watchPath=list.get(i-1);
System.out.println("Lock1监听的是:  "+watchPath);
zk.exists("/a/"+watchPath, this);//监听此节点的详细情况,如果发生节点注销事件
//则会触发自身的process方法
break;//结束循环
}
}
}

}
}

@Override
public void process(WatchedEvent event) {
// TODO Auto-generated method stub
if(event.getType()==Event.EventType.NodeDeleted){
//如果发现,监听的节点,挂掉了,那么就重新,进行监听
try{
System.out.println("注意有锁退出或释放,公平锁开始抢占........");
check();
}catch(Exception e){
e.printStackTrace();
}
}
}
/**
*
* 读取数据,给定一个路径和
* 监听事件
*
* ***/
public String read(String path,Watcher watch)throws Exception{
byte[] data=zk.getData(path, watch, null);

return new String(data,CHARSET);
}
SimpleDateFormat f=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

/**
* 关闭zk连接
*
* **/
public void close()throws Exception{
zk.close();
}

/**
* 释放锁
* @throws Exception
*/
public void automicSwitch()throws Exception{
// System.out.println("有节点释放锁,Lock1锁占入.......,  时间  "+f.format(new Date()));
System.out.println("Lock1的上级锁节点退出或释放锁了,Lock1锁占入.......,  时间  "+f.format(new Date()));
}
/**
* 创建一个持久node,
*
* **/
public void createPersist()throws Exception{
zk.create("/a", "主节点".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("创建主节点成功........");

}
/***
* 创建锁node,注意是抢占 的
*
*
* */
public void createTemp()throws Exception{
zk.create("/a/b", "a".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println("Lock1注册锁成功,进入公平队列...........");
}
public static void main(String[] args)throws Exception {
//Slave s=new Slave("192.168.120.128:2181");
Lock1 lock=new Lock1("192.168.120.128:2181");
//  lock.createPersist();//创建主节点
lock.createTemp();//注册临时有序节点
lock.check();
Thread.sleep(Long.MAX_VALUE);
//lock.close();
}
/***
* 获取锁成功
*
* */
public void startMaster(){
System.out.println("Lock1节点获取锁了,其他节点等待........");
}
}


代码如上,所示,测试的时候,需要搭建一个3个节点的zookeeper集群,关于怎么搭建zookeeper集群,散仙前面的文章里有介绍,需要注意的是myid文件不要漏掉。
上面这个类,需要拷贝多份,并改变里面的节点的值,放在不同的eclipse中,进行模拟测试。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-366136-1-1.html 上篇帖子: ZooKeeper源码分析(一) 下篇帖子: zookeeper使用原理探究
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表