设为首页 收藏本站
查看: 647|回复: 0

[经验分享] 大数据框架hadoop之Observe设计模式应用

[复制链接]
发表于 2016-12-12 11:20:23 | 显示全部楼层 |阅读模式
       Observer观察者设计模式是行为模式的一种,它的作用是当一个对象的状态发生变化时,能够自动通知其他关联对象,自动刷新对象状态
       Observer模式提供给关联对象一种同步通信的手段,使某个对象与依赖它的其他对象之间保持状态同步。如下用代码的形式来展现被观察者(新闻出版社)和观察者(与之关联的订户观察者对象)是如何保持信息同步的。
1       被观察者-新闻出版社
/**
 * NewsPublisher: 新闻出版社
 */
class NewsPublisher extends Observable {
    public void publishNews(String newsTitle, String newsBody) {
        News news = new News(newsTitle, newsBody);
        setChanged();   //通过setChanged()方法标明对象的状态已发生变化
        this.notifyObservers(news);   //通知各Observer,并发送一个名为news对象的消息
 
        // ... ...
    }
}
2       观察者-订户观察者
/**
 * 订户观察者。
 * Created by myuser on 2014/11/29.
 */
public class SubscriberObserver implements Observer {
    // 新闻出版社调用notifyObservers(news)方法,自动调用如下方法以保持信息同步。
    public void update(Observable observee, Object param) {
        if (param instanceof News) {
            mail2Subscriber((News)param);
        }
    }
 
    private void mail2Subscriber(News news) {
        System.out.println("Mail to subscriber. A news published with title:" + news.getTitle());
    }
}
3       构造者
    被观察者调用notifyObservers()方法后,为什么观察者就能接收到呢?那是因为有构造者这个角色,它将观察者添加到被观察者的依赖对象里面,代码如下:
public class Client {
    /**
     * Test Observer Pattern
     */
    public static void main(String[] args) {
        NewsPublisher publisher = new NewsPublisher();
        // 添加订户观察者依赖对象
        publisher.addObserver(new SubscriberObserver());
        //发布新闻,触发通知事件
        publisher.publishNews("Hello news", "news body");
    }
}
上面是一个简单的Observer观察者设计模式的实例。接下来看看大数据框架hadoop是如何应用该设计模式的。
应用场景如下:JobTracker收到作业后,并不会马上对其初始化,而是交给调度器,由它按照一定的策略对作业进行初始化。
4       被观察者-JobTracker
    JobTracker进行作业添加(执行addJob()方法)时,会同步该消息到对应的观察者(JobInProgressListener )那里,代码如下:
publicclass JobTracker {
  privatefinal List<JobInProgressListener> jobInProgressListeners =
    new CopyOnWriteArrayList<JobInProgressListener>();
  private synchronized JobStatus addJob(JobID jobId, JobInProgress
job) {
      ... ...
      for (JobInProgressListener listener : jobInProgressListeners) {
        listener.jobAdded(job);  //通知各Observer,并发送job消息
      }
      ... ...
  }
}
5       观察者-JobQueueJobInProgressListener
class JobQueueJobInProgressListener extends JobInProgressListener {
  private Map<JobSchedulingInfo, JobInProgress> jobQueue;
  @Override
  publicvoid jobAdded(JobInProgress job) {
    // 将作业添加到作业队列里。
    jobQueue.put(new JobSchedulingInfo(job.getStatus()), job);
  }
}
6       构造者
    接下来看一下JobTracker和JobQueueJobInProgressListener的依赖关系是如何建立起来的。
    先看一下JobTracker类的一些信息,代码如下所示:
publicclass JobTracker implements MRConstants, InterTrackerProtocol,
    JobSubmissionProtocol, TaskTrackerManager, RefreshUserMappingsProtocol,
    RefreshAuthorizationPolicyProtocol, AdminOperationsProtocol,
JobTrackerMXBean {
  // 该类实例化的时候,会从配置文件的属性mapred.jobtracker.taskScheduler获取调度器的类名,然后实例化一个调度器作为JobTracker的一个属性。代码如下所示:
  JobTracker(final JobConf conf, String identifier, Clock clock, QueueManager qm) {
... ...
    Class<? extends TaskScheduler> schedulerClass
      = conf.getClass("mapred.jobtracker.taskScheduler",
          JobQueueTaskScheduler.class, TaskScheduler.class);
taskScheduler = (TaskScheduler) ReflectionUtils.newInstance(schedulerClass, conf);
... ...
  }
  // JobTracker类被运行的时候会去调用startTracker()静态方法和offerService(),代码如下所示:
  publicstaticvoid main(String argv[]
                          ) throws IOException, InterruptedException {
... ...
    JobTracker tracker = startTracker(new JobConf());
tracker.offerService();
... ...
  }
// startTracker()静态方法首先实例一个JobTracker类,然后将当前实例赋给调度器的taskTrackerManager属性。
publicstatic JobTracker startTracker(JobConf conf, String identifier)
  throws IOException, InterruptedException {
... ...
    result = new JobTracker(conf, identifier);
result.taskScheduler.setTaskTrackerManager(result);
... ...
returnresult;
  }
  // offerService()方法调用调度器的start()方法。
  publicvoid offerService() throws InterruptedException, IOException {
    ... ...
taskScheduler.start();
... ...
  }
}
// 如下是调度器的start()方法,该方法将JobQueueJobInProgressListener类添加到JobTracker的依赖属性里,也即构造了JobTracker和JobQueueJobInProgressListener的被观察者与观察者关系。
class JobQueueTaskScheduler extends TaskScheduler {
  public JobQueueTaskScheduler() {
    this.jobQueueJobInProgressListener=new JobQueueJobInProgressListener();
  }
  publicsynchronizedvoid start() throws IOException {
    ... ...
taskTrackerManager.addJobInProgressListener(jobQueueJobInProgressListener);
... ...
  }
}

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-313246-1-1.html 上篇帖子: Hadoop之Cloudera Manager安装问题总结【转】 下篇帖子: 解决Eclipse中运行WordCount出现 java.lang.ClassNotFoundException: org.apache.hadoop.exam
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表