设为首页 收藏本站
查看: 1372|回复: 0

[经验分享] elastic-job的原理简介和使用

[复制链接]

尚未签到

发表于 2019-1-29 13:11:48 | 显示全部楼层 |阅读模式
  转自:http://blog.csdn.net/fanfan_v5/article/details/61310045
  elastic-job是当当开源的一款非常好用的作业框架,在这之前,我们开发定时任务一般都是使用quartz或者spring-task(ScheduledExecutorService),无论是使用quartz还是spring-task,我们都会至少遇到两个痛点:
  1.不敢轻易跟着应用服务多节点部署,可能会重复多次执行而引发系统逻辑的错误。
  2.quartz的集群仅仅只是用来HA,节点数量的增加并不能给我们的每次执行效率带来提升,即不能实现水平扩展。
  本篇博文将会自顶向下地介绍elastic-job,让大家认识了解并且快速搭建起环境。
elastic-job产品线说明
  elastic-job在2.x之后,出了两个产品线:Elastic-Job-Lite和Elastic-Job-Cloud。我们一般使用Elastic-Job-Lite就能够满足需求,本文也是以Elastic-Job-Lite为主。1.x系列对应的就只有Elastic-Job-Lite,并且在2.x里修改了一些核心类名,差别虽大,原理类似,建议使用2.x系列。写此博文,最新release版本为2.0.5。

elastic-job-lite原理
  举个典型的job场景,比如余额宝里的昨日收益,系统需要job在每天某个时间点开始,给所有余额宝用户计算收益。如果用户数量不多,我们可以轻易使用quartz来完成,我们让计息job在某个时间点开始执行,循环遍历所有用户计算利息,这没问题。可是,如果用户体量特别大,我们可能会面临着在第二天之前处理不完这么多用户。另外,我们部署job的时候也得注意,我们可能会把job直接放在我们的webapp里,webapp通常是多节点部署的,这样,我们的job也就是多节点,多个job同时执行,很容易造成重复执行,比如用户重复计息,为了避免这种情况,我们可能会对job的执行加锁,保证始终只有一个节点能执行,或者干脆让job从webapp里剥离出来,独自部署一个节点。
  elastic-job就可以帮助我们解决上面的问题,elastic底层的任务调度还是使用的quartz,通过zookeeper来动态给job节点分片。
  我们来看:
  很大体量的用户需要在特定的时间段内计息完成
  我们肯定是希望我们的任务可以通过集群达到水平扩展,集群里的每个节点都处理部分用户,不管用户数量有多庞大,我们只要增加机器就可以了,比如单台机器特定时间能处理n个用户,2台机器处理2n个用户,3台3n,4台4n...,再多的用户也不怕了。
  使用elastic-job开发的作业都是zookeeper的客户端,比如我希望3台机器跑job,我们将任务分成3片,框架通过zk的协调,最终会让3台机器分别分配到0,1,2的任务片,比如server0-->0,server1-->1,server2-->2,当server0执行时,可以只查询id%3==0的用户,server1执行时,只查询id%3==1的用户,server2执行时,只查询id%3==2的用户。
  任务部署多节点引发重复执行
  在上面的基础上,我们再增加server3,此时,server3分不到任务分片,因为只有3片,已经分完了。没有分到任务分片的作业程序将不执行。
  如果此时server2挂了,那么server2的分片项会分配给server3,server3有了分片,就会替代server2执行。
  如果此时server3也挂了,只剩下server0和server1了,框架也会自动把server3的分片随机分配给server0或者server1,可能会这样,server0-->0,server1-->1,2。
  这种特性称之为弹性扩容,即elastic-job名称的由来。
代码演示
  我们搭建环境通过示例代码来演示上面的例子,elastic-job是不支持单机多实例的,通过zk的协调分片是以ip为单元的。很多同学上来可能就是通过单机多实例来学习,结果导致分片和预期不一致。这里没办法,只能通过多机器或者虚拟机,我们这里使用虚拟机,另外,由于资源有限,我们这里仅仅只模拟两台机器。
  节点说明:
  本地宿主机器
  zookeeper、job
  192.168.241.1
  虚拟机
  job
  192.168.241.128
  环境说明:
  Java
  请使用JDK1.7及其以上版本。
  Zookeeper
  请使用Zookeeper3.4.6及其以上版本
  Elastic-Job-Lite
  2.0.5(2.x系列即可,最好是2.0.4及其以上,因为2.0.4版本有本人提交的少许代码,(*^__^*) 嘻嘻……)
  需求说明:
  通过两台机器演示动态分片
  step1. 引入框架的jar包
  

  
    com.dangdang
  
    elastic-job-lite-core
  
    2.0.5
  

  

  

  
    com.dangdang
  
    elastic-job-lite-spring
  
    2.0.5
  

  step2. 编写job
package com.fanfan.sample001;  

  
import com.dangdang.ddframe.job.api.ShardingContext;
  
import com.dangdang.ddframe.job.api.simple.SimpleJob;
  

  
import java.util.Date;
  

  
/**
  
* Created by fanfan on 2016/12/20.
  
*/
  
public class MySimpleJob implements SimpleJob {
  
    @Override
  
    public void execute(ShardingContext shardingContext) {
  
        System.out.println(String.format("------Thread ID: %s, 任务总片数: %s, 当前分片项: %s",
  
                Thread.currentThread().getId(), shardingContext.getShardingTotalCount(), shardingContext.getShardingItem()));
  
        /**
  
         * 实际开发中,有了任务总片数和当前分片项,就可以对任务进行分片执行了
  
         * 比如 SELECT * FROM user WHERE status = 0 AND MOD(id, shardingTotalCount) = shardingItem
  
         */
  
    }
  
}
  Step3. Spring配置
  

  
   
  
   
  

  
   
  
   
  

  

  
  Case1. 单节点
  



  
  Case2. 增加一个节点
  





  Case3. 断开一个节点



作业类型
  elastic-job提供了三种类型的作业:Simple类型作业、Dataflow类型作业、Script类型作业。这里主要讲解前两者。Script类型作业意为脚本类型作业,支持shell,python,perl等所有类型脚本,使用不多,可以参见github文档。
  SimpleJob需要实现SimpleJob接口,意为简单实现,未经过任何封装,与quartz原生接口相似,比如示例代码中所使用的job。
  Dataflow类型用于处理数据流,需实现DataflowJob接口。该接口提供2个方法可供覆盖,分别用于抓取(fetchData)和处理(processData)数据。
  可通过DataflowJobConfiguration配置是否流式处理。
  流式处理数据只有fetchData方法的返回值为null或集合长度为空时,作业才停止抓取,否则作业将一直运行下去; 非流式处理数据则只会在每次作业执行过程中执行一次fetchData方法和processData方法,随即完成本次作业。
  实际开发中,Dataflow类型的job还是很有好用的。
  比如拿余额宝计息来说:
package com.fanfan.sample001;  

  
import com.dangdang.ddframe.job.api.ShardingContext;
  
import com.dangdang.ddframe.job.api.dataflow.DataflowJob;
  

  
import java.util.ArrayList;
  
import java.util.List;
  

  
/**
  
* Created by fanfan on 2016/12/23.
  
*/
  
public class MyDataFlowJob implements DataflowJob {
  

  
    /*
  
        status
  
        0:待处理
  
        1:已处理
  
     */
  

  
    @Override
  
    public List fetchData(ShardingContext shardingContext) {
  
        List users = null;
  
        /**
  
         * users = SELECT * FROM user WHERE status = 0 AND MOD(id, shardingTotalCount) = shardingItem Limit 0, 30
  
         */
  
        return users;
  
    }
  

  
    @Override
  
    public void processData(ShardingContext shardingContext, List data) {
  
        for (User user: data) {
  
            System.out.println(String.format("用户 %s 开始计息", user.getUserId()));
  
            user.setStatus(1);
  
            /**
  
             * update user
  
             */
  
        }
  
    }
  
}

其它功能
  上述介绍的是最精简常用的功能。elastic-job的功能集还不止这些,比如像作业事件追踪、任务监听等,另外,elastic-job-lite-console作为一个独立的运维平台还提供了用来查询和操作任务的web页面。
  这些增强的功能读者可以在github/elastic-job上自行学习,相信有了本篇博文的基础,再阅读那些文档就特别简单了。



运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-669208-1-1.html 上篇帖子: Kibana使用安装 下篇帖子: 使用Docker1.13.1快速部署ELK环境
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表