设为首页 收藏本站
查看: 981|回复: 0

Hadoop中共享全局信息的几种方法

[复制链接]

尚未签到

发表于 2015-11-11 14:18:03 | 显示全部楼层 |阅读模式
  在编写Hadoop MapReduce程序的过程中有时候需要在各个Mapper或者Reducer中使用一些共享的全局数据,例如在处理整数数据表格的时候有时候需要让每个Reducer知道各个列的取值范围或是一些图算法中需要让各个Reducer知道图的连通关系。
  



加入key/value对
通用,但效率不高
将共享文件放在HDFS上,采用Hadoop的文件操作API访问


通用,效率一般(可读可写)
将共享信息加入JobConf/Configure对象,使用set/get系列方法访存
较适用于小信息,效率最高
将共享信息加入DistributedCache对象
较适用于大量共享信息(只能读)  


  1, 最基本的方法是把需要共享的信息加到key/value对中。这种方法简单易行(用Text表示value,然后在正常数据后面加间隔符和全局数据),但是网络效率和处理效率都受到非常严重的影响。另外有时候还需要重新设计MR的内容。
  2, 把共享文件放在HDFS上,在每个Mapper/Reducer中使用HDFS的文件API去访问。这种方法比较通用,但是需要涉及HDFS的文件操作,较为复杂且效率会受到影响。
  读写HDFS的API与标准Java文件API有一点差异,需要使用特定的对象来创建InputStream/OutputStream。下面举一个从HDFS文件中读取信息的例子。
  其中的关键点在于:首先根据当前的JobConf获得当前的文件系统(它默认从hadoop下的配置文件中读取相关信息,同样适用于单节点模式);然后要使用FileSystem的成员方法open打开文件(它返回一个FSDataInputStream,它是InputStream的子类),千万不要试图使用一般的Java文件API打开输入流或直接使用Hadoop的Path打开文件,如new Scanner(p.toString())或new
Scanner(new Path(hdfs.getHomeDirectory(),p).toString()),会出现找不到文件的异常(即使文件就在所显示的目录里面)

  

import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
class XXX{
private int N;
List<Integer> D=new ArrayList<Integer>();
.....
private void setConfByHDFS(Path p, JobConf conf) throws IOException {
FileSystem hdfs = FileSystem.get(conf);
Scanner s = new Scanner(hdfs.open(p));//使用hdfs.open打开文件输入流
N = s.nextInt();for (int i = 0; i < N; i++) {
D.add(s.nextInt());
}
s.close();
}
}
3, 使用JobConf的set*方法写入配置信息,再在Mapper/Reducer的configure方法里面使用JobConf的get*方法读取相关信息。  
  由于信息是写入JobConf的,读取的时候不设计HDFS的读写,效率最高。但是这种方法难以共享大量信息。比较适合设置一些全局变量。
  实现的时候需要重载Mapper/Reducer的configure方法。
  set*方法在JobConf中根据指定的名字创建一个指定类型&#20540;,get*方法根据名字访问已经存入的&#20540;,对于基本类型可以通过一个额外的参数指定访问失败时返回的默认&#20540;(class方法失败时返回null)。可以使用setInt/getInt,setFloat/getFloat这样的方法存取如int、float这样的类型;存取单个字符串直接使用set/get方法;setStrings/getStrings方法的访问的是一个String类型的数组。
  

class XXX{
...
public static class CSVReducer extends MapReduceBase implements
Reducer<IntWritable, IntWritable, IntWritable, VectorIntWritable> {
private int N=0;
private ArrayList<Integer> D = new ArrayList<Integer>();
@Override
public void configure(JobConf job) {//只有这里能访问到JobConf
super.configure(job);
N=job.getInt(&quot;csvcount.conf.num&quot;, -1);//访问共享信息
String str = job.get(&quot;csvcount.conf.d&quot;);
for (String s : str.split(&quot;,&quot;)) {
D.add(Integer.parseInt(s));
}
}
@Override
public void reduce(IntWritable key, Iterator<IntWritable> values,
OutputCollector<IntWritable, VectorIntWritable> output, Reporter reporter) throws IOException {
int[] res = new int[D.get(key.get())];
// System.out.println(D.get(key.get()));
...
}
}
private void setConfByConfigure(Path p, JobConf conf) throws IOException {//创建任务后调用本函数类写入全局共享信息
FileSystem hdfs = FileSystem.get(conf);
Scanner s = new Scanner(hdfs.open(p));
int N = s.nextInt();
ArrayList<Integer> D = new ArrayList<Integer>();
for (int i = 0; i < N; i++) {
D.add(s.nextInt());
}
s.close();
conf.setInt(&quot;csvcount.conf.num&quot;, N);//写入共享信息
conf.set(&quot;csvcount.conf.d&quot;, D.toString().replaceAll(&quot;[\\[\\] ]&quot;, &quot;&quot;));
}

  4, 写入DistributedCache。它是Hadoop专门为共享一些只读的全局信息提供的一个较为简单的机制。Hadoop将所有加入DistributedCache的文件都copy了一份到相关节点的本地临时目录中(还记得配置hadoop时候的配过的那个需要写本地路径的临时目录项吗?),因此对这些文件的读写完全是本地文件的读写操作。因为这些文件只被从HDFS复制到了本地而不回传,所以对它们的写操作是没有意义的也是无法共享的。
  使用的时候需要先调用DistributedCache的静态方法addCacheFile将共享文件/目录的URI加入到任务JobConf中;访问之前使用DistributedCache的另一个静态方法getLocalCachedFiles将job中的共享文件全都列出来,然后就可以使用标准的Java文件API打开文件了。
  在Mapper/Reducer中需要重载configure方法。
  

public class WordCount2 extends Configured implements Tool {
public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
private Set<String> patternsToSkip = new HashSet<String>();
public void configure(JobConf job) {//重载的configure方法,用来从job中获取DistributedCache信息
if (job.getBoolean(&quot;wordcount2.skip.patterns&quot;, false)) {
Path[] patternsFiles = new Path[0];
try {
patternsFiles = DistributedCache.getLocalCacheFiles(job);//获取DistributedCache文件数组
} catch (IOException ioe) {
System.err.println(&quot;Caught exception while getting cached files: &quot;
+ StringUtils.stringifyException(ioe));
}
for (Path patternsFile : patternsFiles)
parseSkipFile(patternsFile);
}
}
private void parseSkipFile(Path patternsFile) {
try {
BufferedReader fis = new BufferedReader(new FileReader(patternsFile.toString()));//正常打开文件
String pattern = null;
while ((pattern = fis.readLine()) != null) {
patternsToSkip.add(pattern);
}
fis.close();
} catch (IOException ioe) {}
}
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {....}
}
public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {...}
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(getConf(), WordCount2.class);
conf.setJobName(&quot;wordcount2&quot;);
...
List<String> other_args = new ArrayList<String>();
for (int i = 0; i < args.length; ++i) {
if (&quot;-skip&quot;.equals(args)) {
DistributedCache.addCacheFile(new Path(args[++i]).toUri(), conf);//设置DistributedCache
conf.setBoolean(&quot;wordcount2.skip.patterns&quot;, true);
} else {
other_args.add(args);
}
}
FileInputFormat.setInputPaths(conf, new Path(other_args.get(0)));
FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));
JobClient.runJob(conf);
return 0;
}
}
  
  



原载于http://blog.iyunv.com/yanxiangtianji


转载请注明出处






版权声明:本文为博主原创文章,未经博主允许不得转载。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-137964-1-1.html 上篇帖子: Hadoop实战实例 下篇帖子: Hadoop作业调优参数整理
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表