设为首页 收藏本站
查看: 929|回复: 0

[经验分享] Hadoop Stream

[复制链接]

尚未签到

发表于 2015-7-12 09:39:36 | 显示全部楼层 |阅读模式
Apache > Hadoop > Core > common > docs > r0.19.2 > cn









  • 项目
  • 维基
  • Hadoop 0.18文档




Last Published: 07/01/2009 00:38:20





文档



概述

快速入门

集群搭建

HDFS构架设计

HDFS使用指南

HDFS权限指南

HDFS配额管理指南

命令手册

FS Shell使用指南

DistCp使用指南

Map-Reduce教程

Hadoop本地库



Streaming

Hadoop Archives

Hadoop On Demand

API参考

API Changes

维基

常见问题

邮件列表

发行说明

变更日志




PDF






使用自定义的方法切分行来形成Key/Value对
  之前已经提到,当Map/Reduce框架从mapper的标准输入读取一行时,
  它把这一行切分为key/value对。 在默认情况下,每行第一个tab符之前的部分作为key,
  之后的部分作为value(不包括tab符)。
  但是,用户可以自定义,可以指定分隔符是其他字符而不是默认的tab符,
  或者指定在第n(n>=1)个分割符处分割而不是默认的第一个。例如:

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
-input myInputDirs \
-output myOutputDir \
-mapper org.apache.hadoop.mapred.lib.IdentityMapper \
-reducer org.apache.hadoop.mapred.lib.IdentityReducer \
-jobconf stream.map.output.field.separator=. \
-jobconf stream.num.map.output.key.fields=4

  在上面的例子,“-jobconf stream.map.output.field.separator=.”指定“.”作为map输出内容的分隔符,
  并且从在第四个“.”之前的部分作为key,之后的部分作为value(不包括这第四个“.”)。
  如果一行中的“.”少于四个,则整行的内容作为key,value设为空的Text对象(
  就像这样创建了一个Text:new Text(""))。
  同样,用户可以使用“-jobconf stream.reduce.output.field.separator=SEP”和“-jobconf
  stream.num.reduce.output.fields=NUM”来指定reduce输出的行中,第几个分隔符处分割key和value。

一个实用的Partitioner类 (二次排序,-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner 选项)
  Hadoop有一个工具类org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner, 它在应用程序中很有用。Map/reduce框架用这个类切分map的输出, 切分是基于key值的前缀,而不是整个key。例如:

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
-input myInputDirs \
-output myOutputDir \
-mapper org.apache.hadoop.mapred.lib.IdentityMapper \
-reducer org.apache.hadoop.mapred.lib.IdentityReducer \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
-jobconf stream.map.output.field.separator=. \
-jobconf stream.num.map.output.key.fields=4 \
-jobconf map.output.key.field.separator=. \
-jobconf num.key.fields.for.partition=2 \
-jobconf mapred.reduce.tasks=12

  其中,-jobconf stream.map.output.field.separator=.
  和-jobconf stream.num.map.output.key.fields=4是前文中的例子。
  Streaming用这两个变量来得到mapper的key/value对。
  上面的Map/Reduce 作业中map输出的key一般是由“.”分割成的四块。
  但是因为使用了 -jobconf num.key.fields.for.partition=2 选项,
  所以Map/Reduce框架使用key的前两块来切分map的输出。
  其中, -jobconf map.output.key.field.separator=. 指定了这次切分使用的key的分隔符。
  这样可以保证在所有key/value对中, key值前两个块值相同的所有key被分到一组,
  分配给一个reducer。
  这种高效的方法等价于指定前两块作为主键,后两块作为副键。 主键用于切分块,
  主键和副键的组合用于排序。一个简单的示例如下:
  Map的输出(key)

11.12.1.2
11.14.2.3
11.11.4.1
11.12.1.1
11.14.2.2

  切分给3个reducer(前两块的值用于切分)

11.11.4.1
-----------
11.12.1.2
11.12.1.1
-----------
11.14.2.3
11.14.2.2

  在每个切分后的组内排序(四个块的值都用于排序)

11.11.4.1
-----------
11.12.1.1
11.12.1.2
-----------
11.14.2.2
11.14.2.3



Hadoop聚合功能包的使用(-reduce aggregate 选项)
  Hadoop有一个工具包“Aggregate”
  ( https://svn.apache.org/repos/asf/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/libaggregate)。 “Aggregate”提供一个特殊的reducer类和一个特殊的combiner类, 并且有一系列的“聚合器”(“aggregator”)
  (例如“sum”,“max”,“min”等)用于聚合一组value的序列。
  用户可以使用Aggregate定义一个mapper插件类, 这个类用于为mapper输入的每个key/value对产生“可聚合项”。
   combiner/reducer利用适当的聚合器聚合这些可聚合项。
  要使用Aggregate,只需指定“-reducer aggregate”:

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
-input myInputDirs \
-output myOutputDir \
-mapper myAggregatorForKeyCount.py \
-reducer aggregate \
-file myAggregatorForKeyCount.py \
-jobconf mapred.reduce.tasks=12

  python程序myAggregatorForKeyCount.py例子:

#!/usr/bin/python
import sys;
def generateLongCountToken(id):
return "LongValueSum:" + id + "\t" + "1"
def main(argv):
line = sys.stdin.readline();
try:
while line:
line = line[:-1];
fields = line.split("\t");
print generateLongCountToken(fields[0]);
line = sys.stdin.readline();
except "end of file":
return None
if __name__ == "__main__":
main(sys.argv)


字段的选取(类似于unix中的 'cut' 命令)
  Hadoop的工具类org.apache.hadoop.mapred.lib.FieldSelectionMapReduce帮助用户高效处理文本数据,
  就像unix中的“cut”工具。工具类中的map函数把输入的key/value对看作字段的列表。 用户可以指定字段的
  分隔符(默认是tab), 可以选择字段列表中任意一段(由列表中一个或多个字段组成)作为map输出的key
  或者value。 同样,工具类中的reduce函数也把输入的key/value对看作字段的列表,用户可以选取任意一
  段作为reduce输出的key或value。例如:

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
-input myInputDirs \
-output myOutputDir \
-mapper org.apache.hadoop.mapred.lib.FieldSelectionMapReduce\
-reducer org.apache.hadoop.mapred.lib.FieldSelectionMapReduce\
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
-jobconf map.output.key.field.separa=. \
-jobconf num.key.fields.for.partition=2 \
-jobconf mapred.data.field.separator=. \
-jobconf map.output.key.value.fields.spec=6,5,1-3:0- \
-jobconf reduce.output.key.value.fields.spec=0-2:5- \
-jobconf mapred.reduce.tasks=12

  选项“-jobconf map.output.key.value.fields.spec=6,5,1-3:0-”指定了如何为map的输出选取key和valu
  e。Key选取规则和value选取规则由“:”分割。 在这个例子中,map输出的key由字段6,5,1,2和3组成
  。输出的value由所有字段组成(“0-”指字段0以及之后所有字段)。
  选项“-jobconf reduce.output.key.value.fields.spec=0-2:0-”(译者注:此处应为”0-2:5-“)指定如
  何为reduce的输出选取value。 本例中,reduce的输出的key将包含字段0,1,2(对应于原始的字段
  6,5,1)。 reduce输出的value将包含起自字段5的所有字段(对应于所有的原始字段)。


常见问题



我该怎样使用Hadoop Streaming运行一组独立(相关)的任务呢?
  多数情况下,你不需要Map Reduce的全部功能, 而只需要运行同一程序的多个实例,
  或者使用不同数据,或者在相同数据上使用不同的参数。
   你可以通过Hadoop Streaming来实现。


如何处理多个文件,其中每个文件一个map?
  例如这样一个问题,在集群上压缩(zipping)一些文件,你可以使用以下几种方法:


  • 使用Hadoop Streaming和用户编写的mapper脚本程序:

    • 生成一个文件,文件中包含所有要压缩的文件在HDFS上的完整路径。
      每个map 任务获得一个路径名作为输入。
    • 创建一个mapper脚本程序,实现如下功能:获得文件名,把该文件拷
      贝到本地,压缩该文件并把它发到期望的输出目录。


  • 使用现有的Hadoop框架:

    • 在main函数中添加如下命令:
             FileOutputFormat.setCompressOutput(conf, true);
      FileOutputFormat.setOutputCompressorClass
      (conf, org.apache.hadoop.io.compress.GzipCodec.class);
      conf.setOutputFormat(NonSplitableTextInputFormat.class);
      conf.setNumReduceTasks(0);


    • 编写map函数:
             public void map(WritableComparable key, Writable value,
      OutputCollector output,
      Reporter reporter) throws IOException {
      output.collect((Text)value, null);
      }
    • 注意输出的文件名和原文件名不同



应该使用多少个reducer?
  请参考Hadoop Wiki:Reducer


如果在Shell脚本里设置一个别名,并放在-mapper之后,Streaming会正常运行吗?

例如,alias cl='cut -fl',-mapper "cl"会运行正常吗?
  脚本里无法使用别名,但是允许变量替换,例如:

$ hadoop dfs -cat samples/student_marks
alice   50
bruce   70
charlie 80
dan     75
$ c2='cut -f2'; $HADOOP_HOME/bin/hadoop jar
$HADOOP_HOME/hadoop-streaming.jar \
-input /user/me/samples/student_marks
-mapper \"$c2\" -reducer 'cat'  
-output /user/me/samples/student_out
-jobconf mapred.job.name='Experiment'
$ hadoop dfs -ls samples/student_out
Found 1 items/user/me/samples/student_out/part-00000       16
$ hadoop dfs -cat samples/student_out/part-00000
50
70
75
80

我可以使用UNIX pipes吗?例如 –mapper "cut –fl | set s/foo/bar/g"管用么?
  现在不支持,而且会给出错误信息“java.io.IOException: Broken pipe”。这或许是一个bug,
  需要进一步研究。

在streaming作业中用-file选项运行一个分布式的超大可执行文件(例如,3.6G)时,
我得到了一个错误信息“No space left on device”。如何解决?

  配置变量stream.tmpdir指定了一个目录,在这个目录下要进行打jar包的操作。
  stream.tmpdir的默认值是/tmp,你需要将这个值设置为一个有更大空间的目录:

-jobconf stream.tmpdir=/export/bigspace/...

如何设置多个输入目录?
  可以使用多个-input选项设置多个输入目录:

hadoop jar hadoop-streaming.jar -input '/user/foo/dir1' -input '/user/foo/dir2'

如何生成gzip格式的输出文件?
  除了纯文本格式的输出,你还可以生成gzip文件格式的输出,你只需设置streaming作业中的选项‘-jobconf mapred.output.compress=true -jobconf mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCode’。


Streaming中如何自定义input/output format?
  至少在Hadoop 0.14版本以前,不支持多个jar文件。所以当指定自定义的类时,
  你要把他们和原有的streaming jar打包在一起,并用这个自定义的jar包替换默认的hadoop streaming jar包。


Streaming如何解析XML文档?
  你可以使用StreamXmlRecordReader来解析XML文档。

hadoop jar hadoop-streaming.jar -inputreader "StreamXmlRecord
,begin=BEGIN_STRING,end=END_STRING" ..... (rest of the command)

  Map任务会把BEGIN_STRING和END_STRING之间的部分看作一条记录。


在streaming应用程序中如何更新计数器?
  streaming进程能够使用stderr发出计数器信息。 reporter:counter:,, 应该被发送到stderr来更新计数器。


如何更新streaming应用程序的状态?
  streaming进程能够使用stderr发出状态信息。 reporter:status: 要被发送到stderr来设置状态。






Last Published: 07/01/2009 00:38:20

Copyright © 2007 The Apache Software Foundation.

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-85691-1-1.html 上篇帖子: 王家林,云计算,大数据,Hadoop,Android,iOS,HTML5,Linux----王家林一站式全系列云计算大数据Hadoop&Android&HTML 下篇帖子: Windows中的Eclipse连接Linux下的Hadoop进行开发
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表