设为首页 收藏本站
查看: 1025|回复: 0

[经验分享] Apache Tez DAG计算应用框架

[复制链接]

尚未签到

发表于 2017-1-7 07:53:03 | 显示全部楼层 |阅读模式
  转自:http://zcdeng.iteye.com/blog/1897208
  

1. Tez简介
Tez是基于Hadoop Yarn之上的DAG(有向无环图,Directed Acyclic Graph)计算框架。它把Map/Reduce过程拆分成若干个子过程,同时可以把多个Map/Reduce任务组合成一个较大的DAG任务,减少了Map/Reduce之间的文件存储。同时合理组合其子过程,也可以减少任务的运行时间。
2. DAG计算模型
Map/Reduce不能解决所有问题,它适合在分布式环境中处理那些海量数据批处理计算程序,其计算模型主要分为两阶段:第一阶段为Map阶段,输出的是<Key, Value>Pair对;再进行数据的Shuffle和Sort;然进入第二阶段Reduce阶段,在这一阶段就是对<Key, Value>对的计算逻辑处理。但是它无法更好地完成要求更高的计算任务,例如图计算中需要BSP迭代计算框架,要把上一个Map/Reduce任务的输出用于下一个Map/Reduce任务的输入;类似Hive和Pig的交互式有向图计算。DAG计算模型是针对Map/Reduce所遇问题而提出来的一种计算模型。下图是Map/Reduce模型与DAG模型的差别。
DSC0000.png

从图中可以看出:当采用Map/Reduce模型,我们处理一个大任务时需要四个Map/Reduce,那么就需要四个小Job来组合成一个大Job,这样会多几次的输入输出消耗。而采用Tez,它们形成一个大任务,这些子任务组合成一张DAG图,在数据的处理中间过程中,就没有往hdfs里面写数据,而是直接向它的后继节点输出数据。

3. Tez框架实现
在其中一篇技术博客Hadoop Yarn解决多类应用兼容方法讲到在Yarn上如何兼容各类应用的思路。在Hadoop Yarn上实现HamaBSP计算应用博文中讲解了如何在Yarn上开发出一个自己的应用。在这里,我将着重讲解在Tez应用的代码结构上,它是如何实现一个DAG计算模型。
从前面的博文中提到,对每个应用都需要去实现一个YARNRunner类去提交c对应的Job。在Tez里面,有一个这样的类org.apache.tez.mapreduce.YARNRunner。我们将以这个类为入口,讲解Tez的实现过程。
如下是Tez YARNRunner提交任务的实现代码。
Java代码 DSC0001.png


  • @Override
  • publicJobStatussubmitJob(JobIDjobId,StringjobSubmitDir,Credentialsts)
  • throwsIOException,InterruptedException{
  • //与MR应用一样,先向RM获得一个applicationID。
  • ApplicationIdappId=resMgrDelegate.getApplicationId();

  • FileSystemfs=FileSystem.get(conf);
  • //Loadsthejob.xmlwrittenbytheuser.
  • JobConfjobConf=newJobConf(newTezConfiguration(conf));

  • //ExtractindividualrawMRconfigs.
  • //为每个stage创建它自己的conf文件
  • Configuration[]stageConfs=MultiStageMRConfToTezTranslator
  • .getStageConfs(jobConf);

  • //TransformallconfstouseTezkeys
  • MultiStageMRConfToTezTranslator.translateVertexConfToTez(stageConfs[0],
  • null);
  • for(inti=1;i<stageConfs.length;i++){
  • MultiStageMRConfToTezTranslator.translateVertexConfToTez(stageConfs,
  • stageConfs[i-1]);
  • }

  • //createinputstotezClient.submit()

  • //FIXMEsetupjobresources
  • Map<String,LocalResource>jobLocalResources=
  • createJobLocalResources(stageConfs[0],jobSubmitDir);

  • //FIXMEcreateDAGshouldtakethetezConfasaparameter,insteadofusing
  • //MRkeys.
  • //创建它的一个DAG图
  • DAGdag=createDAG(fs,jobId,stageConfs,jobSubmitDir,ts,
  • jobLocalResources);

  • //略去...,创建一堆与Appmaster相关的conf配置,用于启动Tez的appmaster所用

  • //SubmittoResourceManager
  • try{
  • PathappStagingDir=fs.resolvePath(newPath(jobSubmitDir));
  • //向集群提交DAG任务
  • dagClient=tezClient.submitDAGApplication(
  • appId,
  • dag,
  • appStagingDir,
  • ts,
  • jobConf.get(JobContext.QUEUE_NAME,
  • YarnConfiguration.DEFAULT_QUEUE_NAME),
  • vargs,
  • environment,
  • jobLocalResources,dagAMConf);

  • }catch(TezExceptione){
  • thrownewIOException(e);
  • }

  • returngetJobStatus(jobId);
  • }


上面的代码之中可以看出,它需要为该任务构造一个DAG图。下面是org.apache.tez.mapreduce.YARNRunner.createDAG(FileSystem, JobID, Configuration[], String, Credentials, Map<String, LocalResource>)的源码实现。
Java代码


  • privateDAGcreateDAG(FileSystemfs,JobIDjobId,Configuration[]stageConfs,
  • StringjobSubmitDir,Credentialsts,
  • Map<String,LocalResource>jobLocalResources)throwsIOException{
  • //为DAG任务命名
  • StringjobName=stageConfs[0].get(MRJobConfig.JOB_NAME,
  • YarnConfiguration.DEFAULT_APPLICATION_NAME);
  • DAGdag=newDAG(jobName);

  • LOG.info("Numberofstages:"+stageConfs.length);

  • TaskLocationHint[]mapInputLocations=getMapLocationHintsFromInputSplits(
  • jobId,fs,stageConfs[0],jobSubmitDir);
  • TaskLocationHint[]reduceInputLocations=null;
  • //各个子任务subtask的初始化
  • Vertex[]vertices=newVertex[stageConfs.length];//构造task节点
  • for(inti=0;i<stageConfs.length;i++){
  • vertices=createVertexForStage(stageConfs,jobLocalResources,
  • i==0?mapInputLocations:reduceInputLocations,i,
  • stageConfs.length);
  • }

  • for(inti=0;i<vertices.length;i++){
  • dag.addVertex(vertices);//向dag中添加任务节点
  • if(i>0){
  • EdgePropertyedgeProperty=newEdgeProperty(
  • ConnectionPattern.BIPARTITE,SourceType.STABLE,
  • newOutputDescriptor(OnFileSortedOutput.class.getName(),null),
  • newInputDescriptor(ShuffledMergedInput.class.getName(),null));

  • Edgeedge=null;
  • edge=newEdge(vertices[i-1],vertices,edgeProperty);
  • dag.addEdge(edge);//向DAG图中添加边的属性
  • }

  • }
  • returndag;
  • }

大任务的DAG计算信息都存储在Vertex和Edge里面。我们将在这里详细分析Vertex和Edge的关系。
下面是向RM提交的任务信息,用于启动tez appmaster。appmaster的启动类为org.apache.tez.dag.app.DAGAppMaster。
Java代码


  • privateApplicationSubmissionContextcreateApplicationSubmissionContext(
  • ApplicationIdappId,DAGdag,PathappStagingDir,Credentialsts,
  • StringamQueueName,StringamName,List<String>amArgs,
  • Map<String,String>amEnv,Map<String,LocalResource>amLocalResources,
  • TezConfigurationamConf)throwsIOException,YarnException{

  • //省略一些配置参数及方法(conf配置,环境变量classpath参数和appmasterJava命令)...
  • //emitprotobufDAGfilestyle
  • PathbinaryPath=newPath(appStagingDir,
  • TezConfiguration.TEZ_AM_PLAN_PB_BINARY+"."+appId.toString());
  • amConf.set(TezConfiguration.TEZ_AM_PLAN_REMOTE_PATH,binaryPath.toUri()
  • .toString());

  • ConfigurationfinalAMConf=createFinalAMConf(amConf);

  • DAGPlandagPB=dag.createDag(finalAMConf);//用dag构建一个DAGPlan作业计划

  • FSDataOutputStreamdagPBOutBinaryStream=null;

  • try{
  • //binaryoutput
  • dagPBOutBinaryStream=FileSystem.create(fs,binaryPath,
  • newFsPermission(TEZ_AM_FILE_PERMISSION));
  • dagPB.writeTo(dagPBOutBinaryStream);//并且写到硬盘上
  • }finally{
  • if(dagPBOutBinaryStream!=null){
  • dagPBOutBinaryStream.close();
  • }
  • }
  • //省略localResources的配置信息...
  • //SetupContainerLaunchContextforAMcontainer
  • ContainerLaunchContextamContainer=
  • ContainerLaunchContext.newInstance(localResources,environment,
  • vargsFinal,null,securityTokens,acls);

  • //SetuptheApplicationSubmissionContext
  • ApplicationSubmissionContextappContext=Records
  • .newRecord(ApplicationSubmissionContext.class);

  • appContext.setApplicationType(TezConfiguration.TEZ_APPLICATION_TYPE);
  • appContext.setApplicationId(appId);
  • appContext.setResource(capability);
  • appContext.setQueue(amQueueName);
  • appContext.setApplicationName(amName);
  • appContext.setCancelTokensWhenComplete(conf.getBoolean(
  • TezConfiguration.TEZ_AM_CANCEL_DELEGATION_TOKEN,
  • TezConfiguration.DEFAULT_TEZ_AM_CANCEL_DELEGATION_TOKEN));
  • appContext.setAMContainerSpec(amContainer);

  • returnappContext;
  • }


4. Vertex & Edge
<续>
5. MapReduce
<续>

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-324852-1-1.html 上篇帖子: 使用Apache Shiro进行身份认证 下篇帖子: 提高开发效率的组件 -- Apache Commons
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表