设为首页 收藏本站
查看: 755|回复: 0

[经验分享] zookeeper实战:SingleWorker代码样例

[复制链接]
累计签到:6 天
连续签到:1 天
发表于 2015-9-6 10:01:47 | 显示全部楼层 |阅读模式
  们需要一个“单点worker”系统,此系统来确保系统中定时任务在分布式环境中,任意时刻只有一个实例处于活跃;比如,生产环境中,有6台机器支撑一个应用,但是一个应用中有30个定时任务,这些任务有些必须被在“单线程”环境中运行(例如“数据统计”任务),避免并发的原因不是在java层面,可能是在操作db数据时,或者是在消息消费时,或者是信息推送时等。某个指标的“数据统计”任务,每天只需要执行一次,即使执行多次也是妄费,因为这种类型的定时任务,需要被“单点”。同时,如果一个任务在没有报告结果的情况下异常推出,我们仍然期望集群中其他实例能够主动“接管”它。在实现不良好的架构中,可能有些开发者使用手动触发特定脚本的方式执行,有些web项目可能是通过配置特定host的方式开启任务。对于某些定时任务,可能会采用quartz-cluster中的某些实现,但是他需要数据库的额外支持。
     此时,我们将使用zookeeper来实现此功能。本实例提供了如下功能展示:
    1) 提供了单点worker功能
    2) 提供了worker均衡能力(30个worker相对均匀的分配到6台机器上)
    3) 提供了worker失效接管能力。
     但是仍有很多亟待解决的问题:
    1) 无法确保任务的接管是及时的,即一个任务执行者失效,将会在一定的过期时间后,才会被其他sid接管
    2) 在极少的情况下,仍然会有一个任务同时被2个sid执行。
    3) 在极少的情况下,会有极短的时间内,一个任务不会被任何sid接管,处于“孤立”状态
    尽管zk提供了watch机制,但是上述问题,不仅不能完全避免,还会额外增加代码的复杂度。最终我个人放弃了对在此类中使用watch的想法。。
    注意:zk中exist和create/delete等操作并非原子,可能在exist返回false的情况下,去create此节点,也有可能抛出NoExistsException;你应该能够想到“并发”环境造成此问题的时机(其他zk客户端也有类似的操作,并发)。
    注意:在zk中删除父节点,将会导致子节点一并删除;同理,如果创建一个节点,那么它的各级父节点必须已经存在,且节点的层级越深,对zk底层存储而言数据结构越冗杂。

    数据结构与设计思路:
    1) serverType为当前应用标识,我们期望每个应用都有各自的serverType,方便数据分类; jobType为任务类型或者任务名称;如下全节点表示某个serverType的jobType下有sid1,sid2,sid3共三个实例(例如tomcat实例,或者物理机器标识)参与了此任务。zk节点路径格式:
        /severType/jobType/register/sid1
        ............................../sid2
        ............................../sid3
    2) 表示此jobType,被sid1运行。zk节点路径格式
        /severType/jobType/alive/sid1 挂载数据:null
    3) /serverType/jobType 挂载数据:cronException;将任务的“cron表达式”作为数据挂载
    4) {todu} 表示serverType下每个sid运行的任务个数,我们可以用来“均衡”任务,将新任务分配给任务较少的sid上。
        /serverType/sid1 挂载数据:任务个数.
  
  
  如下是本人的代码样例,实际生产环境中代码与样例有区别,此处仅供参考,本实例基于zookeeper + quartz 2.1,如有错误之处,请不吝赐教:
  1) TestMain.java :测试引导类
  2) PrintNumberJob.java:一个简单的任务,打印一个随即数字。
  3) PrintTimeJob.java:一个简单的任务,打印当前时间。
  4) SingleWorkerManager.java:核心类,用于处理调用者提交的任务,并确保结果符合预期。此类有2个内部工作线程组成,分别处理zk数据同步和用户任务交付等工作。
  
  很遗憾,源代码非常的长,尽管我已经足够细心的整理格式,但还是不够悦目,建议参阅者下载代码阅读,谢谢
  SingleWorkerManager.java
  

  • package com.sample.zk.singleWorker;  

  • /**  
  • *  
  • * @author qing
  • *
  • */  
  • public class SingleWorkerManager {  

  •     private static final String GROUP = "single-worker";  
  •     private Scheduler scheduler;  
  •     private ZooKeeper zkClient;  
  •     private String serverType = "_-default-_";//默认serverType类型,我祈祷不会有人估计它和一样  
  •     private static final String REGISTER = "/register";  
  •     private static final String ALIVE = "/alive";  

  •     private Watcher dw = new InnerZK();//default watcher;  

  •     private boolean isAlive = false;//是否可用  

  •     private Object tag = new Object();  

  •     private ReentrantLock lock = new ReentrantLock();  

  •     private String sid;//当前server标记,可以是IP等,主要用来表达如下描述:某sid上运行**任务;将**任务分配给某sid;  
  •     //真实场景下,可以为IP为192.168.197.2下tomcat运行printNumber任务;  
  •     //任务在哪个server上运行,需要有明确的信息才行,所以sid的设计需要很直观。  
  •     //  
  •     //很多时候,我们都是以ip地址来标记任务被运行的环境地址,不过在有些比较“穷苦的公司”,  
  •     //可能一个物理server下运行多个对等的tomcat实例  
  •     //或许这种方式下,使用ip作为标记,就一些麻烦了。  

  •     //已经在本地提交,但尚未提交给zk的任务,直到zk接受任务之后,提交任务者才返回  
  •     //+++++++++++++++++++++++++++++++++++++++++++  
  •     //如何设计任务提交,可能面临一个奇怪的选择,如下是2种队列:  
  •     //LinkedBlockingQueue提供了阻塞与非阻塞两种方式,非阻塞的方式允许任务提交这立即返回,但是此任务此后是否能够被zk  
  •     //正确接受将存在风险,有可能zk的故障,导致此任务无法正常运行。  
  •     //private BlockingQueue<Worker> outgoingWorker  = new LinkedBlockingQueue<Worker>();  
  •     //++++++++++++++++++++++++++++++++++++++++++++  
  •     //同步队列,是一个“单工”队列,如果任务任务被zk正确接受之后,任务提交者才返回,这是一个理想的情况。任务一旦开始被处理,任务提交者就可以返回了。  
  •     //如果任务提交时,刚好zk环境故障,那么此任务将会被重试多次,如果还未能成功,则失败。  
  •     //++++++++++++++++++++++++  
  •     //无论如何,你都需要做出一个选择,我选择了最直观的答案:SynchronousQueue + 同步  
  •     //++++++++++++++++++++++++  
  •     private SynchronousQueue<Worker> outgoingWorker = new SynchronousQueue<Worker>();  

  •     //当前serverType下所有的任务  
  •     private Map<String,Worker> allWorkers = new HashMap<String,Worker>();  

  •     //当前实例上所运行的任务,它是allWorkers的子集  
  •     private Map<String,Worker> selfWorkers = new HashMap<String,Worker>();  

  •     //用来间歇性的与zk进行同步,用来检测job的冲突或者新job的分配  
  •     private Thread syncThread;  
  •     //用于向zk提交任务数据的线程,将和SynchronousQueue协同工作  
  •     private Thread workerThread;  

  •     /**
  •      * 创建zk实例
  •      */  
  •     public SingleWorkerManager(String sid){  
  •         this(sid,null);  

  •     }

  •     public SingleWorkerManager(String sid,String sType){  
  •         if(sType != null){  
  •             this.serverType = sType;  
  •         }
  •         try{  
  •             zkClient = new ZooKeeper(Constants.connectString, 3000, dw,false);  
  •         }catch(Exception e){  
  •             e.printStackTrace();
  •             throw new RuntimeException(e);  
  •         }
  •         this.sid = sid;  
  •         syncThread = new Thread(new SyncHandler());  
  •         syncThread.setDaemon(true);  
  •         syncThread.start();
  •     }

  •     /**
  •      * 开启任务调度器
  •      */  
  •     public void start(){  
  •         try{  
  •             scheduler = StdSchedulerFactory.getDefaultScheduler();
  •             scheduler.start();
  •             workerThread = new Thread(new WorkerHandler());  
  •             workerThread.setDaemon(true);  
  •             workerThread.start();
  •             isAlive = true;  
  •             synchronized (tag) {  
  •                 tag.notifyAll();
  •             }

  •             //首次同步  
  •             sync();
  •         }catch(Exception e){  
  •             e.printStackTrace();
  •             throw new RuntimeException(e);//异常退出  
  •         }
  •     }

  •     /**
  •      * 关闭任务调度器,关闭zookeeper链接
  •      * 此后将导致任务被立即取消,singleWorkerManager实例将无法被重用
  •      */  
  •     public void close(){  
  •         lock.lock();
  •         try{  
  •             isAlive = false;  
  •             scheduler.shutdown();
  •             if (syncThread.isAlive()) {  
  •                 syncThread.interrupt();
  •             }
  •             if(workerThread.isAlive()){  
  •                 workerThread.interrupt();
  •             }
  •             if(zkClient != null){  
  •                 zkClient.close();
  •             }
  •         }catch(Exception e){  
  •             e.printStackTrace();
  •         }finally{  
  •             lock.unlock();
  •         }
  •     }

  •     /**
  •      * 取消job,将触发zk服务也“取消”此任务
  •      * @param jobName
  •      */  
  •     public void unschedule(String jobName){  
  •         try{  
  •             //here,zk  
  •             lock.unlock();
  •             try{  
  •                 String jobPath = "/" + serverType + "/" + jobName;  
  •                 Stat stat = zkClient.exists(jobPath, false);  
  •                 if(stat != null){  
  •                     zkClient.delete(jobPath, stat.getVersion());
  •                 }
  •             }catch(NoNodeException e){  
  •                 //ignore;  
  •             }catch(Exception e){  
  •                 e.printStackTrace();
  •             }
  •             //有syncHandler来取消本地任务  
  • //          //here,local scheduler  
  • //          //无论如何,本地都要取消  
  • //          TriggerKey key = new TriggerKey(jobName, GROUP);  
  • //          if(scheduler.checkExists(key)){  
  • //              scheduler.unscheduleJob(key);  
  • //          }  
  •         }catch(Exception e){  
  •             e.printStackTrace();
  •         }
  •     }

  •     /**
  •      * 提交任务,如果提交失败,将抛出异常
  •      * @param jobClass
  •      * @param cronExpression
  •      * @return true任务提交成功,false任务提交失败
  •      */  
  •     public boolean schedule(Class<? extends Job> jobClass,String cronExpression){  
  •         if(!isAlive){  
  •             throw new IllegalStateException("worker has been closed!");  
  •         }
  •         try{  
  •             Worker worker = this.build(jobClass, cronExpression);  
  •             return outgoingWorker.offer(worker,15,TimeUnit.SECONDS);//waiting here,最多15妙  
  •         }catch(Exception e){  
  •             e.printStackTrace();
  •             throw new RuntimeException(e);  
  •         }
  •     }

  •     private Worker build(Class<? extends Job> jobClass,String cronExpression){  
  •         String name = jobClass.getName();//全路径name  
  •         JobDetail job = JobBuilder.newJob(jobClass).withIdentity(name,GROUP).build();
  •         CronScheduleBuilder sb = CronScheduleBuilder.cronSchedule(cronExpression);//每两秒执行一次:"*/2 * * * * ?"  
  •         Trigger trigger = TriggerBuilder.newTrigger().withIdentity(name, GROUP).withSchedule(sb).build();
  •         return new Worker(job, trigger,cronExpression);  
  •     }


  •     ///////////////////////////////////////////////////////inner worker//////////////////////////////  

  •     /**
  •      * 当前实例的zkClient是否链接正常,scheduler是否处于可用状态
  •      * @return
  •      */  
  •     private boolean isReady(){  
  •         if(!isAlive){  
  •             return false;  
  •         }
  •         if(scheduler == null || zkClient == null){  
  •             return false;  
  •         }
  •         try{  
  •             if(scheduler.isShutdown() || !scheduler.isStarted()){  
  •                 return false;  
  •             }
  •         }catch(Exception e){  
  •             e.printStackTrace();
  •             return false;  
  •         }
  •         if(zkClient.getState().isConnected()){  
  •             return true;  
  •         }
  •         return false;  
  •     }

  •     /**
  •      * 同步selfWorkers列表,和zk环境中的列表进行比较,查看是否有任务冲突
  •      */  
  •     private void syncSelfWorker(){  
  •         lock.lock();
  •         try{  
  •             if(!isReady()){  
  •                 throw new RuntimeException("Scheduler error..");//以异常的方式中断  
  •             }
  •             //首先检测自己持有的任务列表,是否和zk一致,首次同步,selfWorkers肯定是空,需要sync后续去做调度。  
  •             for(String job : selfWorkers.keySet()){  
  •                 String jobPath = "/" + serverType + "/" + job;  
  •                 //如果此任务已经被远程取消,则取消本地job执行  
  •                 //所有的实例都会做同样的事情,一定会把那些“取消的任务”取消  
  •                 if(zkClient.exists(jobPath, false) == null){  
  •                     allWorkers.remove(job);
  •                     Worker cw = selfWorkers.remove(job);
  •                     if(cw != null){  
  •                         if(scheduler.checkExists(cw.getJob().getKey())){  
  •                             scheduler.unscheduleJob(cw.getTrigger().getKey());
  •                         }
  •                     }
  •                     continue;  
  •                 }
  •                 String alive = "/" + serverType + "/" + job + ALIVE;  
  •                 //查看是否有子节点冲突,比如一个job被多个server运行  
  •                 List<String> alives = zkClient.getChildren(alive, false);  
  •                 if(alives == null || alives.isEmpty()){  
  •                     //如果此任务尚未分配,则交付给workerHandler  
  •                     continue;  
  •                 }
  •                 if(alives.size() == 1){  
  •                     String holder = alives.get(0);  
  •                     //如果已分配且接管者是自己,更新时间  
  •                     if(holder.equalsIgnoreCase(sid)){  
  •                         byte[] data = String.valueOf(System.currentTimeMillis()).getBytes();  
  •                         zkClient.setData(alive + "/" + sid, data, -1);//ignore version  
  •                         continue;//如果是自己  
  •                     }
  •                 }
  •                 //对于其他情况,当前sid只能让步(有可能会存在所有的sid都让步,导致任务在极短时间内无法运行,  
  •                 //后台“补救”线程会做工作)  
  •                 if(zkClient.exists(alive + "/" + sid, false) != null){  
  •                     try{  
  •                         zkClient.delete(alive + "/" + sid, -1);  
  •                         scheduler.unscheduleJob(new TriggerKey(job, GROUP));  
  •                         selfWorkers.remove(job);
  •                     }catch(NoNodeException e){  
  •                         //ignore:  
  •                     }catch (Exception e) {  
  •                         e.printStackTrace();
  •                     }
  •                 }
  •             }
  •         }catch(Exception e){  
  •             e.printStackTrace();
  •         }finally{  
  •             lock.unlock();
  •         }
  •     }


  •     /**
  •      * 同步任务信息,将当前实例中scheduler运行的任务和zk进行比较,进行冲突检测。
  •      * 1) 检测自己正在运行的任务,是否和zk中心中分配给自己的任务列表一致。
  •      * 2) 获得当前serverType下所有的任务列表
  •      *  
  •      */  
  •     private void sync(){  
  •         lock.lock();
  •         try{  
  •             if(!isReady()){  
  •                 throw new RuntimeException("Scheduler error..");  
  •             }
  •             //检测一级节点  
  •             Stat tstat = zkClient.exists("/" + serverType,false);  
  •             if(tstat == null){  
  •                 try{  
  •                     zkClient.create("/" + serverType, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);  
  •                 }catch(NodeExistsException e){  
  •                     //ignore  
  •                 }
  •             }
  •             //+++++++++++++++++++  
  •             syncSelfWorker();
  •             //+++++++++++++++++++  

  •             //获得所有任务列表  
  •             List<String> allJobs = zkClient.getChildren("/" + serverType, false);  
  •             if(allJobs == null){  
  •                 throw new RuntimeException("NO jobs, error..");//以异常的方式,终端方法调用,没有别的意思。  
  •             }
  •             allWorkers.clear();//reload all  
  •             for(String job : allJobs){  
  •                 try{  
  •                     //job为类的全名,节点下挂载的数据为cronException  
  •                     byte[] data = zkClient.getData("/" + serverType + "/" + job, false, null);  
  •                     if(data == null || data.length == 0){  
  •                         continue;  
  •                     }

  •                     //简单考虑吧,不过作为一名合格的程序员,此处可能需要太多的校验。  
  •                     Class<? extends Job> jobClass = (Class<? extends Job>)ClassLoader.getSystemClassLoader().loadClass(job);  
  •                     Worker worker = build(jobClass, new String(data));  
  •                     allWorkers.put(job,worker);
  •                     //自己检测到任务后,注册自己  
  •                     String registerPath = "/" + serverType + "/" + job + REGISTER + "/" + sid;  
  •                     //如果不存在  
  •                     if(zkClient.exists(registerPath, false) == null){  
  •                         try{  
  •                             zkClient.create(registerPath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);  
  •                         }catch(NodeExistsException ex){  
  •                             //ignore;如果自己已经注册过,则忽略  
  •                         }
  •                     }
  •                     //检测此worker是否为自己所持有  
  •                     String alivePath = "/" + serverType + "/" + job + ALIVE +"/" + sid;  
  •                     //如果此任务不属于自己运行,则继续  
  •                     if(zkClient.exists(alivePath, false) == null){  
  •                         continue;  
  •                     }
  •                     //如果属于自己运行,则开启任务,本地是否开启任务,完全取决于zk的数据状态  
  •                     try{  
  •                         boolean exists = scheduler.checkExists(worker.getJob().getKey());  
  •                         if(!exists){  
  •                             //如果尚未在当前实例中调度,则立即调度  
  •                             scheduler.scheduleJob(worker.getJob(),worker.getTrigger());
  •                             selfWorkers.put(job,worker);
  •                         }
  •                     }catch(Exception e){  
  •                         e.printStackTrace();
  •                         zkClient.delete(alivePath, -1);//ignore version;  
  •                         //再次校验  
  •                         selfWorkers.remove(job);
  •                     }
  •                 }catch(ClassNotFoundException e){  
  •                     e.printStackTrace();
  •                     throw new RuntimeException(e);  
  •                 }
  •             }

  •         }catch(Exception e){  
  •             e.printStackTrace();
  •         }finally{  
  •             lock.unlock();
  •         }
  •     }


  •     class InnerZK implements Watcher {  

  •         public void process(WatchedEvent event) {  
  •             // 如果是“数据变更”事件  
  •             if (event.getType() != EventType.None) {  
  •                 //processExt(event);  
  •                 return;  
  •             }

  •             // 如果是链接状态迁移  
  •             // 参见keeperState  
  •             switch (event.getState()) {  
  •             case SyncConnected:  
  •                 System.out.println("Connected...");  
  •                 // 链接状态迁移时,检测worker信息  
  •                 sync();
  •                 break;  
  •             case Expired:  
  •                 System.out.println("Expired...");  
  •                 break;  
  •             // session过期  
  •             case Disconnected:  
  •                 // 链接断开,或session迁移  
  •                 System.out.println("Connecting....");  
  •                 break;  
  •             case AuthFailed:  
  •                 close();
  •                 throw new RuntimeException("ZK Connection auth failed...");  
  •             default:  
  •                 break;  
  •             }
  •         }

  •     }

  •     /**
  •      * 分配任务,在所有的worker信息都同步结束后,然后在逐个检测任务状态,对于没有
  •      * 被执行的新任务,或者已经失去托管的任务,交付给其他sid。
  •      *  
  •      * 任务分配,没有采取“严格均衡”的方式,我们使用了一个随即方式。
  •      */  
  •     private void scheduler(){  
  •         lock.lock();
  •         for(String job : allWorkers.keySet()){  
  •             try{  
  •                 //如果没有,则创建一个持久节点,挂载数据为,系统时间戳,你可以为此节点加上ACL控制,但会带来复杂度  
  •                 //这里可以创建为临时节点,那么你需要对此节点注册watch,当watch触发时(比如其他sid的session失效等)做job的接管  
  •                 //考虑到如果大量的job,大量的watch,在网络复杂的情况下,再加上对zk的并发操作,数据一致性是个问题。  
  •                 //此处,我们采取挂载“时间戳”的方式,在SyncHandler线程中,间歇性的去检测,惰性的非实时的分配和协调任务  
  •                 //此处就要求,你的应用服务器的时间,应该几乎非常一致,如果你无法做到,请在此处增加一个操作分支,从一个统一的地方获得时间:比如DB中等  
  •                 String alivePath = "/" + serverType + "/" + job + ALIVE;  
  •                 List<String> children = zkClient.getChildren(alivePath, false);//如果节点不存在,则在下一次sync时被补救  
  •                 if(children == null || children.isEmpty()){  
  •                     //此job尚未分配  
  •                     String registerPath = "/" + serverType + "/" + job + REGISTER;  
  •                     List<String> rc = zkClient.getChildren(registerPath, false);  
  •                     //等待下一次sync时准备节点数据  
  •                     if(rc == null || rc.isEmpty()){  
  •                         continue;  
  •                     }
  •                     Collections.shuffle(rc);//打乱顺序,随即,取出第一个,其实你可以有很多更好的手段来实现“任务均衡”,此处仅为参考  
  •                     String tsid = rc.get(0);  
  •                     try{  
  •                         byte[] data = String.valueOf(System.currentTimeMillis()).getBytes();  
  •                         zkClient.create(alivePath + "/" + tsid, data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);  
  •                         //tsid对应的syncHandler此后将会检测并补救。此处只是分配给他。  
  •                         //如果tsid也是失去托管的,那么下一次sync检测将会发现并移除,此处不再做多余的校验;  
  •                         //在极端情况下,比如你的“任务托管过期时间”过短,或者你的系统发布过程很长,但是所有的任务都失去托管  
  •                         //那么最终将会有一台机器接管大部分job,如果job个数很多,将会出现“雪崩效应”;  
  •                         //如果你不能容忍这些事情的发生,请在此处增加有效的barrier操作(如果接管任务个数达到一定个数,将接受但不执行任务)  
  •                         //或者refuse操作(既不接管也不执行任务)。  
  •                         System.out.println("Job switch,SID:" + tsid + ",JOB :" + job);  
  •                     }catch(NodeExistsException e){  
  •                         //ignore;  
  •                     }
  •                     continue;  
  •                 }
  •                 //如果job已经被其他sid接管,那么检测接管者,是否处于活跃,如果存在多个子节点,其实是  
  •                 //一种异常情况,此处我们只做校验,冲突有sync解决  
  •                 for(String id : children){  
  •                     String tpath = alivePath + "/" + id;  
  •                     Stat stat = new Stat();  
  •                     byte[] data = zkClient.getData(tpath, false,stat);  
  •                     long time = Long.valueOf(new String(data));  
  •                     long current = System.currentTimeMillis();  
  •                     //如果一个任务,它的执行者在2分钟内都没有和zk交互(synSelfWorker方法中会更新time)  
  •                     //表明已经过期  
  •                     //为了便于测试,此处为15秒  
  •                     if(time + 1500 < current){  
  •                         try{  
  •                             zkClient.delete(tpath, stat.getVersion());
  •                         }catch(BadVersionException e){  
  •                             //ignore  
  •                         }catch(NoNodeException e){  
  •                             //ignore;  
  •                         }
  •                     }else{  
  •                         System.out.println(id + " :" + job);  
  •                     }
  •                 }
  •             }catch(Exception e){  
  •                 e.printStackTrace();
  •             }
  •         }
  •         lock.unlock();
  •     }
  •     /**
  •      * 任务同步线程,间歇性的检测zk持有的任务和本地任务是否一致
  •      * 并负责分配任务
  •      * @author qing
  •      *
  •      */  
  •     class SyncHandler implements Runnable {  

  •         public void run() {  
  •             try {  
  •                 int i = 0;  
  •                 int l = 10;  
  •                 while (true) {  
  •                     synchronized (tag) {  
  •                         try{  
  •                             while(!scheduler.isStarted()){  
  •                                 tag.wait();
  •                             }
  •                         }catch(Exception e){  
  •                             //  
  •                         }
  •                     }
  •                     System.out.println("Sync handler,running...tid: " + Thread.currentThread().getId());  
  •                     if (zkClient == null || (zkClient.getState() == States.NOT_CONNECTED || zkClient.getState() == States.CLOSED)) {  
  •                         lock.lock();
  •                         try {  
  •                             // 回话重建等异常行为  
  •                             zkClient = new ZooKeeper(Constants.connectString, 3000, dw, false);  
  •                             System.out.println("Reconnected success!...");  
  •                         } catch (Exception e) {  
  •                             e.printStackTrace();
  •                             i++;
  •                             Thread.sleep(3000 + i * l);// 在zk环境异常情况下,每3秒重试一次  
  •                         } finally {  
  •                             lock.unlock();
  •                         }
  •                         continue;  
  •                     }
  •                     if (zkClient.getState().isConnected()) {  
  •                         sync();//同步任务  
  •                         scheduler();//任务分配和过期检测  
  •                         Thread.sleep(3000);// 如果被“中断”,直接退出  
  •                         i = 0;  
  •                     }else{  
  •                         Thread.sleep(3000);  
  •                     }
  •                 }
  •             } catch (InterruptedException e) {  
  •                 System.out.println("SID:" + sid + ",SyncHandler Exit...");  
  •                 close();
  •             }

  •         }
  •     }

  •     /**
  •      * 调用者提交的任务,将会被同步的方式交付给zk。此线程就是负责从queue中获取调用者
  •      * 提交的job,然后依次在zk环境中生成节点数据。
  •      * @author qing
  •      *
  •      */  
  •     class WorkerHandler implements Runnable{  
  •         private Set<Worker> pending = new HashSet<Worker>();  
  •         private int count = 0;//max = 20;  

  •         /**
  •          * 将worker信息生成zk节点数据
  •          * @param worker
  •          * @return
  •          */  
  •         private boolean register(Worker worker){  
  •             lock.lock();
  •             //逐级创建其父节点  
  •             String jobName = worker.getJob().getKey().getName();
  •             try{  
  •                 Transaction tx = zkClient.transaction();//使用事务的方式  
  •                 String jobPath = "/" + serverType + "/" + jobName;  
  •                 if(zkClient.exists(jobPath, false) == null){  
  •                     tx.create(jobPath, worker.getCronExpression().getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  •                 }
  •                 String registerPath = "/" + serverType + "/" + jobName+ REGISTER;  
  •                 if(zkClient.exists(registerPath, false) == null){  
  •                     tx.create(registerPath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);  
  •                 }
  •                 String alivePath = "/" + serverType + "/" + jobName+ ALIVE;  
  •                 if(zkClient.exists(alivePath, false) == null){  
  •                     tx.create(alivePath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);  
  •                 }
  •                 tx.create(registerPath + "/" + sid, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);  
  •                 tx.commit();
  •             }catch(NodeExistsException e){  
  •                 //ignore  
  •             }catch(Exception e){  
  •                 e.printStackTrace();
  •                 pending.add(worker);
  •                 //对于异常数据,添加到一个补充操作队列,如果在操作中出现异常,那么将会在  
  •                 //补充操作中得到再次校验  
  •             }
  •             lock.unlock();
  •             return true;  
  •         }

  •         public void run(){  
  •             try{  
  •                 while(true){  
  •                     synchronized (tag) {  
  •                         try{  
  •                             while(!scheduler.isStarted()){  
  •                                 tag.wait();
  •                             }
  •                         }catch(Exception e){  
  •                             //  
  •                         }
  •                     }
  •                     System.out.println("Worker handler,running...");  
  •                     if(zkClient != null && zkClient.getState().isConnected()){  
  •                         System.out.println("Register...");  
  •                         Worker worker = outgoingWorker.take();
  •                         register(worker);
  •                         if(!pending.isEmpty()){  
  •                             Thread.sleep(500);  
  •                             Iterator<Worker> it = pending.iterator();
  •                             while(it.hasNext()){  
  •                                 boolean isOk = register(it.next());  
  •                                 if(!isOk){  
  •                                     count++;
  •                                     Thread.sleep(1000);  
  •                                 }else{  
  •                                     count = 0;  
  •                                     it.remove();
  •                                 }
  •                                 //如果重试20次,仍无法成功,直接抛弃,非常遗憾  
  •                                 if(count > 20){  
  •                                     pending.clear();
  •                                 }
  •                             }
  •                         }

  •                     }else{  
  •                         Thread.sleep(1000);  
  •                     }
  •                 }

  •             }catch(InterruptedException e){  
  •                 System.out.println("SID:" + sid + ",WorkerHandler Exit...");  
  •                 close();
  •             }
  •         }
  •     }

  •     /**
  •      * 全部删除当前serverType下所有的任务
  •      */  
  •     public void clear(){  
  •         lock.lock();
  •         try{  
  •             if(zkClient != null && zkClient.getState().isConnected()){  
  •                 zkClient.delete("/" + serverType, -1);  
  •             }
  • //          if(scheduler != null && scheduler.isStarted()){  
  • //              for(Worker worker : selfWorkers.values()){  
  • //                  scheduler.unscheduleJob(worker.getTrigger().getKey());  
  • //              }  
  • //          }  
  • //          allWorkers.clear();  
  •         }catch(Exception e){  
  •             e.printStackTrace();
  •         }finally{  
  •             lock.unlock();
  •         }
  •     }

  • }
  
  其他辅助类,请参考附件中的源码,谢谢。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-110013-1-1.html 上篇帖子: ZooKeeper详解 下篇帖子: ZooKeeper Distributed模式
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表