设为首页 收藏本站
查看: 992|回复: 0

[经验分享] Hadoop中一些采样器的实现

[复制链接]

尚未签到

发表于 2016-12-11 07:50:21 | 显示全部楼层 |阅读模式

Hadoop中采样是由org.apache.hadoop.mapred.lib.InputSampler类来实现的。


InputSampler类实现了三种采样方法:SplitSampler、RandomSampler和IntervalSampler。
SplitSampler、RandomSampler和IntervalSampler都是InputSampler的静态内部类,它们都实现了InputSampler的内部接口Sampler接口:
[java] view plaincopy


  • publicinterfaceSampler<K,V>{
  • K[]getSample(InputFormat<K,V>inf,JobConfjob)throwsIOException;
  • }

getSample方法根据job的配置信息以及输入格式获得抽样结果,三个采样类各自有不同的实现。


RandomSampler随机地从输入数据中抽取Key,是一个通用的采样器。RandomSampler类有三个属性:freq(一个Key被选中的概率),numSamples(从所有被选中的分区中获得的总共的样本数目),maxSplitsSampled(需要检查扫描的最大分区数目)。
RandomSampler中getSample方法的实现如下:

[java] view plaincopy


  • publicK[]getSample(InputFormat<K,V>inf,JobConfjob)throwsIOException{
  • InputSplit[]splits=inf.getSplits(job,job.getNumMapTasks());
  • ArrayList<K>samples=newArrayList<K>(numSamples);
  • intsplitsToSample=Math.min(maxSplitsSampled,splits.length);

  • Randomr=newRandom();
  • longseed=r.nextLong();
  • r.setSeed(seed);
  • LOG.debug("seed:"+seed);
  • //shufflesplits
  • for(inti=0;i<splits.length;++i){
  • InputSplittmp=splits;
  • intj=r.nextInt(splits.length);
  • splits=splits[j];
  • splits[j]=tmp;
  • }
  • //ourtargetrateisintermsofthemaximumnumberofsamplesplits,
  • //butweacceptthepossibilityofsamplingadditionalsplitstohit
  • //thetargetsamplekeyset
  • for(inti=0;i<splitsToSample||
  • (i<splits.length&&samples.size()<numSamples);++i){
  • RecordReader<K,V>reader=inf.getRecordReader(splits,job,
  • Reporter.NULL);
  • Kkey=reader.createKey();
  • Vvalue=reader.createValue();
  • while(reader.next(key,value)){
  • if(r.nextDouble()<=freq){
  • if(samples.size()<numSamples){
  • samples.add(key);
  • }else{
  • //Whenexceedingthemaximumnumberofsamples,replacea
  • //randomelementwiththisone,thenadjustthefrequency
  • //toreflectthepossibilityofexistingelementsbeing
  • //pushedout
  • intind=r.nextInt(numSamples);
  • if(ind!=numSamples){
  • samples.set(ind,key);
  • }
  • freq*=(numSamples-1)/(double)numSamples;
  • }
  • key=reader.createKey();
  • }
  • }
  • reader.close();
  • }
  • return(K[])samples.toArray();
  • }

首先通过InputFormat的getSplits方法得到所有的输入分区;然后确定需要抽样扫描的分区数目,取输入分区总数与用户输入的maxSplitsSampled两者的较小的值得到splitsToSample;然后对输入分区数组shuffle排序,打乱其原始顺序;然后循环逐个扫描每个分区中的记录进行采样,循环的条件是当前已经扫描的分区数小于splitsToSample或者当前已经扫描的分区数超过了splitsToSample但是小于输入分区总数并且当前的采样数小于最大采样数numSamples。
每个分区中记录采样的具体过程如下:
从指定分区中取出一条记录,判断得到的随机浮点数是否小于等于采样频率freq,如果大于则放弃这条记录,然后判断当前的采样数是否小于最大采样数,如果小于则这条记录被选中,被放进采样集合中,否则从【0,numSamples】中选择一个随机数,如果这个随机数不等于最大采样数numSamples,则用这条记录替换掉采样集合随机数对应位置的记录,同时采样频率freq减小变为freq*(numSamples-1)/numSamples。然后依次遍历分区中的其它记录。


SplitSampler从s个分区中采样前n个记录,是采样随机数据的一种简便方式。SplitSampler类有两个属性:numSamples(最大采样数),maxSplitsSampled(最大分区数)。其getSample方法实现如下:

[java] view plaincopy


  • publicK[]getSample(InputFormat<K,V>inf,JobConfjob)throwsIOException{
  • InputSplit[]splits=inf.getSplits(job,job.getNumMapTasks());
  • ArrayList<K>samples=newArrayList<K>(numSamples);
  • intsplitsToSample=Math.min(maxSplitsSampled,splits.length);
  • intsplitStep=splits.length/splitsToSample;
  • intsamplesPerSplit=numSamples/splitsToSample;
  • longrecords=0;
  • for(inti=0;i<splitsToSample;++i){
  • RecordReader<K,V>reader=inf.getRecordReader(splits[i*splitStep],
  • job,Reporter.NULL);
  • Kkey=reader.createKey();
  • Vvalue=reader.createValue();
  • while(reader.next(key,value)){
  • samples.add(key);
  • key=reader.createKey();
  • ++records;
  • if((i+1)*samplesPerSplit<=records){
  • break;
  • }
  • }
  • reader.close();
  • }
  • return(K[])samples.toArray();
  • }

首先根据InputFormat得到输入分区数组;然后确定需要采样的分区数splitsToSample为最大分区数和输入分区总数之间的较小值;然后确定对分区采样时的间隔splitStep为输入分区总数除splitsToSample的商;然后确定每个分区的采样数samplesPerSplit为最大采样数除splitsToSample的商。被采样的分区下标为i*splitStep,已经采样的分区数目达到splitsToSample即停止采样。
对于每一个分区,读取一条记录,将这条记录添加到样本集合中,如果当前样本数大于当前的采样分区所需要的样本数,则停止对这个分区的采样。如此循环遍历完这个分区的所有记录。


IntervalSampler根据一定的间隔从s个分区中采样数据,非常适合对排好序的数据采样。IntervalSampler类有两个属性:freq(哪一条记录被选中的概率),maxSplitsSampled(采样的最大分区数)。其getSample方法实现如下:

[java] view plaincopy


  • publicK[]getSample(InputFormat<K,V>inf,JobConfjob)throwsIOException{
  • InputSplit[]splits=inf.getSplits(job,job.getNumMapTasks());
  • ArrayList<K>samples=newArrayList<K>();
  • intsplitsToSample=Math.min(maxSplitsSampled,splits.length);
  • intsplitStep=splits.length/splitsToSample;
  • longrecords=0;
  • longkept=0;
  • for(inti=0;i<splitsToSample;++i){
  • RecordReader<K,V>reader=inf.getRecordReader(splits[i*splitStep],
  • job,Reporter.NULL);
  • Kkey=reader.createKey();
  • Vvalue=reader.createValue();
  • while(reader.next(key,value)){
  • ++records;
  • if((double)kept/records<freq){
  • ++kept;
  • samples.add(key);
  • key=reader.createKey();
  • }
  • }
  • reader.close();
  • }
  • return(K[])samples.toArray();
  • }

首先根据InputFormat得到输入分区数组;然后确定需要采样的分区数splitsToSample为最大分区数和输入分区总数之间的较小值;然后确定对分区采样时的间隔splitStep为输入分区总数除splitsToSample的商。被采样的分区下标为i*splitStep,已经采样的分区数目达到splitsToSample即停止采样。
对于每一个分区,读取一条记录,如果当前样本数与已经读取的记录数的比值小于freq,则将这条记录添加到样本集合,否则读取下一条记录。这样依次循环遍历完这个分区的所有记录。


Hadoop中采样是由org.apache.hadoop.mapred.lib.InputSampler类来实现的。


InputSampler类实现了三种采样方法:SplitSampler、RandomSampler和IntervalSampler。
SplitSampler、RandomSampler和IntervalSampler都是InputSampler的静态内部类,它们都实现了InputSampler的内部接口Sampler接口:
[java] view plaincopy


  • publicinterfaceSampler<K,V>{
  • K[]getSample(InputFormat<K,V>inf,JobConfjob)throwsIOException;
  • }

getSample方法根据job的配置信息以及输入格式获得抽样结果,三个采样类各自有不同的实现。


RandomSampler随机地从输入数据中抽取Key,是一个通用的采样器。RandomSampler类有三个属性:freq(一个Key被选中的概率),numSamples(从所有被选中的分区中获得的总共的样本数目),maxSplitsSampled(需要检查扫描的最大分区数目)。
RandomSampler中getSample方法的实现如下:

[java] view plaincopy


  • publicK[]getSample(InputFormat<K,V>inf,JobConfjob)throwsIOException{
  • InputSplit[]splits=inf.getSplits(job,job.getNumMapTasks());
  • ArrayList<K>samples=newArrayList<K>(numSamples);
  • intsplitsToSample=Math.min(maxSplitsSampled,splits.length);

  • Randomr=newRandom();
  • longseed=r.nextLong();
  • r.setSeed(seed);
  • LOG.debug("seed:"+seed);
  • //shufflesplits
  • for(inti=0;i<splits.length;++i){
  • InputSplittmp=splits;
  • intj=r.nextInt(splits.length);
  • splits=splits[j];
  • splits[j]=tmp;
  • }
  • //ourtargetrateisintermsofthemaximumnumberofsamplesplits,
  • //butweacceptthepossibilityofsamplingadditionalsplitstohit
  • //thetargetsamplekeyset
  • for(inti=0;i<splitsToSample||
  • (i<splits.length&&samples.size()<numSamples);++i){
  • RecordReader<K,V>reader=inf.getRecordReader(splits,job,
  • Reporter.NULL);
  • Kkey=reader.createKey();
  • Vvalue=reader.createValue();
  • while(reader.next(key,value)){
  • if(r.nextDouble()<=freq){
  • if(samples.size()<numSamples){
  • samples.add(key);
  • }else{
  • //Whenexceedingthemaximumnumberofsamples,replacea
  • //randomelementwiththisone,thenadjustthefrequency
  • //toreflectthepossibilityofexistingelementsbeing
  • //pushedout
  • intind=r.nextInt(numSamples);
  • if(ind!=numSamples){
  • samples.set(ind,key);
  • }
  • freq*=(numSamples-1)/(double)numSamples;
  • }
  • key=reader.createKey();
  • }
  • }
  • reader.close();
  • }
  • return(K[])samples.toArray();
  • }

首先通过InputFormat的getSplits方法得到所有的输入分区;然后确定需要抽样扫描的分区数目,取输入分区总数与用户输入的maxSplitsSampled两者的较小的值得到splitsToSample;然后对输入分区数组shuffle排序,打乱其原始顺序;然后循环逐个扫描每个分区中的记录进行采样,循环的条件是当前已经扫描的分区数小于splitsToSample或者当前已经扫描的分区数超过了splitsToSample但是小于输入分区总数并且当前的采样数小于最大采样数numSamples。
每个分区中记录采样的具体过程如下:
从指定分区中取出一条记录,判断得到的随机浮点数是否小于等于采样频率freq,如果大于则放弃这条记录,然后判断当前的采样数是否小于最大采样数,如果小于则这条记录被选中,被放进采样集合中,否则从【0,numSamples】中选择一个随机数,如果这个随机数不等于最大采样数numSamples,则用这条记录替换掉采样集合随机数对应位置的记录,同时采样频率freq减小变为freq*(numSamples-1)/numSamples。然后依次遍历分区中的其它记录。


SplitSampler从s个分区中采样前n个记录,是采样随机数据的一种简便方式。SplitSampler类有两个属性:numSamples(最大采样数),maxSplitsSampled(最大分区数)。其getSample方法实现如下:

[java] view plaincopy


  • publicK[]getSample(InputFormat<K,V>inf,JobConfjob)throwsIOException{
  • InputSplit[]splits=inf.getSplits(job,job.getNumMapTasks());
  • ArrayList<K>samples=newArrayList<K>(numSamples);
  • intsplitsToSample=Math.min(maxSplitsSampled,splits.length);
  • intsplitStep=splits.length/splitsToSample;
  • intsamplesPerSplit=numSamples/splitsToSample;
  • longrecords=0;
  • for(inti=0;i<splitsToSample;++i){
  • RecordReader<K,V>reader=inf.getRecordReader(splits[i*splitStep],
  • job,Reporter.NULL);
  • Kkey=reader.createKey();
  • Vvalue=reader.createValue();
  • while(reader.next(key,value)){
  • samples.add(key);
  • key=reader.createKey();
  • ++records;
  • if((i+1)*samplesPerSplit<=records){
  • break;
  • }
  • }
  • reader.close();
  • }
  • return(K[])samples.toArray();
  • }

首先根据InputFormat得到输入分区数组;然后确定需要采样的分区数splitsToSample为最大分区数和输入分区总数之间的较小值;然后确定对分区采样时的间隔splitStep为输入分区总数除splitsToSample的商;然后确定每个分区的采样数samplesPerSplit为最大采样数除splitsToSample的商。被采样的分区下标为i*splitStep,已经采样的分区数目达到splitsToSample即停止采样。
对于每一个分区,读取一条记录,将这条记录添加到样本集合中,如果当前样本数大于当前的采样分区所需要的样本数,则停止对这个分区的采样。如此循环遍历完这个分区的所有记录。


IntervalSampler根据一定的间隔从s个分区中采样数据,非常适合对排好序的数据采样。IntervalSampler类有两个属性:freq(哪一条记录被选中的概率),maxSplitsSampled(采样的最大分区数)。其getSample方法实现如下:

[java] view plaincopy


  • publicK[]getSample(InputFormat<K,V>inf,JobConfjob)throwsIOException{
  • InputSplit[]splits=inf.getSplits(job,job.getNumMapTasks());
  • ArrayList<K>samples=newArrayList<K>();
  • intsplitsToSample=Math.min(maxSplitsSampled,splits.length);
  • intsplitStep=splits.length/splitsToSample;
  • longrecords=0;
  • longkept=0;
  • for(inti=0;i<splitsToSample;++i){
  • RecordReader<K,V>reader=inf.getRecordReader(splits[i*splitStep],
  • job,Reporter.NULL);
  • Kkey=reader.createKey();
  • Vvalue=reader.createValue();
  • while(reader.next(key,value)){
  • ++records;
  • if((double)kept/records<freq){
  • ++kept;
  • samples.add(key);
  • key=reader.createKey();
  • }
  • }
  • reader.close();
  • }
  • return(K[])samples.toArray();
  • }

首先根据InputFormat得到输入分区数组;然后确定需要采样的分区数splitsToSample为最大分区数和输入分区总数之间的较小值;然后确定对分区采样时的间隔splitStep为输入分区总数除splitsToSample的商。被采样的分区下标为i*splitStep,已经采样的分区数目达到splitsToSample即停止采样。
对于每一个分区,读取一条记录,如果当前样本数与已经读取的记录数的比值小于freq,则将这条记录添加到样本集合,否则读取下一条记录。这样依次循环遍历完这个分区的所有记录。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-312497-1-1.html 上篇帖子: 重温Hadoop(2)-- MapReduce流程及partition 下篇帖子: Data ETL tools for hadoop ecosystem Morphlines
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表