设为首页 收藏本站
查看: 611|回复: 0

[经验分享] Zookeeper(八)分布式队列

[复制链接]
累计签到:2 天
连续签到:1 天
发表于 2015-9-6 13:13:40 | 显示全部楼层 |阅读模式
  

  1. element  方法  获取对列头部第一个元素

  查找队列znode 下所有的子节点名称   使用TreeMap给顺序编号排序  返回第一个znode对应的值
  

    public byte[] element() throws NoSuchElementException, KeeperException, InterruptedException {
TreeMap<Long,String> orderedChildren;
while(true){
try{
// 返回对列中的全部元素name  这里使用到了TreeMap
// map的key为znode的顺序编号  
orderedChildren = orderedChildren(null);
}catch(KeeperException.NoNodeException e){
throw new NoSuchElementException();
}
if(orderedChildren.size() == 0 ) throw new NoSuchElementException();
for(String headNode : orderedChildren.values()){
if(headNode != null){
try{
// 返回对列头部第一个znode对应数据.
return zookeeper.getData(dir+"/"+headNode, false, null);
}catch(KeeperException.NoNodeException e){
}
}
}
}
}
  
获取所有子节点名称
  
  

   private TreeMap<Long,String> orderedChildren(Watcher watcher) throws KeeperException, InterruptedException {
TreeMap<Long,String> orderedChildren = new TreeMap<Long,String>();
List<String> childNames = null;
try{
// 返回对列节点下   所有的子znode 名称
childNames = zookeeper.getChildren(dir, watcher);
}catch (KeeperException.NoNodeException e){
throw e;
}
// 所有子节点
for(String childName : childNames){
try{
//Check format
if(!childName.regionMatches(0, prefix, 0, prefix.length())){
LOG.warn("Found child node with improper name: " + childName);
continue;
}
String suffix = childName.substring(prefix.length());
// 顺序节点编号
Long childId = new Long(suffix);
orderedChildren.put(childId,childName);
}catch(NumberFormatException e){
LOG.warn("Found child node with improper format : " + childName + " " + e,e);
}
}
return orderedChildren;
}
  
  2.   remove  返回对列头部的第一个元素的值  并删除该znode
  

    public byte[] remove() throws NoSuchElementException, KeeperException, InterruptedException {
TreeMap<Long,String> orderedChildren;
while(true){
try{
// 查找所有子节点  用TreeMap排序
orderedChildren = orderedChildren(null);
}catch(KeeperException.NoNodeException e){
throw new NoSuchElementException();
}
if(orderedChildren.size() == 0) throw new NoSuchElementException();
for(String headNode : orderedChildren.values()){
String path = dir +"/"+headNode;
try{
// 对列头部 第一个节点对应的数据
byte[] data = zookeeper.getData(path, false, null);
// 删除该节点
zookeeper.delete(path, -1);
return data;
}catch(KeeperException.NoNodeException e){
// Another client deleted the node first.
}
}
}
}
  
3. take 检索并移除对列头部一个znode对应的值  如果对列为空  则一直等待

  
  

    public byte[] take() throws KeeperException, InterruptedException {
TreeMap<Long,String> orderedChildren;
// Same as for element.  Should refactor this.
while(true){
LatchChildWatcher childWatcher = new LatchChildWatcher();
try{
// 查找所有对列元素 并给对列主znode 设置监视器
orderedChildren = orderedChildren(childWatcher);
}catch(KeeperException.NoNodeException e){
zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT);
continue;
}
// 如果对列中不存在元素
// 对列节点下不存在子节点,  线程将一直等待. 使用到CountDownLatch
// 当客户端调用 offer方法 写入一个元素时  触发LatchChildWatcher监视器CountDownLatch 计数减1 为0时  当前线程获得运行机会
if(orderedChildren.size() == 0){
childWatcher.await();
continue;
}
for(String headNode : orderedChildren.values()){
String path = dir +"/"+headNode;
try{
// 返回对列头部第一个元素
byte[] data = zookeeper.getData(path, false, null);
// 删除该元素
zookeeper.delete(path, -1);
return data;
}catch(KeeperException.NoNodeException e){
// Another client deleted the node first.
}
}
}
}
  

    private class LatchChildWatcher implements Watcher {
CountDownLatch latch;
public LatchChildWatcher(){
latch = new CountDownLatch(1);
}
public void process(WatchedEvent event){
LOG.debug("Watcher fired on path: " + event.getPath() + " state: " +
event.getState() + " type " + event.getType());
latch.countDown();
}
public void await() throws InterruptedException {
latch.await();
}
}
  
  4. offer  写入队列尾部  使用永久顺序节点
  

    public boolean offer(byte[] data) throws KeeperException, InterruptedException{
for(;;){
try{
// 写入对列    节点类型  永久顺序节点
zookeeper.create(dir+"/"+prefix, data, acl, CreateMode.PERSISTENT_SEQUENTIAL);
return true;
}catch(KeeperException.NoNodeException e){
zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT);
}
}
}
  
5.peek  返回对列头部  第一个znode对应的值
  
  

    public byte[] peek() throws KeeperException, InterruptedException{
try{
// 返回对列头部第一个znode的值
return element();
}catch(NoSuchElementException e){
return null;
}
}
  
6. 返回队列头部第一个znode对应的值  并删除该znode
  
  

    public byte[] poll() throws KeeperException, InterruptedException {
try{
// 返回对列znode下 第一个子节点值  并删除该节点
return remove();
}catch(NoSuchElementException e){
return null;
}
}
  

  
  
  
  

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-110159-1-1.html 上篇帖子: 解决客户端通过zookeeper连接到hbase时连接过多的问题 下篇帖子: Zookeeper(六)数据模型
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表