设为首页 收藏本站
查看: 891|回复: 0

[经验分享] hadoop实例 RandomWriter

[复制链接]

尚未签到

发表于 2015-7-11 11:13:44 | 显示全部楼层 |阅读模式
  参考文献:http://www.hadooper.cn/dct/page/65778
1.概述
  RandomWriter(随机写)例子利用 Map/Reduce把 数据随机的写到dfs中。每个map输入单个文件名,然后随机写BytesWritable的键和值到DFS顺序文件。map没有产生任何输出,所以reduce没有执行。产生的数据是可以配置的。配置变量如下

  

名字

默认值

描述
  test.randomwriter.maps_per_host

10

每个节点运行的map任务数
  test.randomwrite.bytes_per_map

1073741824

每个map任务产生的数据量
  test.randomwrite.min_key

10

minimum size of the key in bytes
  test.randomwrite.max_key

1000

maximum size of the key in bytes
  test.randomwrite.min_value

0

minimum size of the value
  test.randomwrite.max_value

20000

maximum size of the value

  test.randomwriter.maps_per_host表示每个工作节点(datanode)上运行map的次数。默认情况下,只有一个数据节点,那么就有10个map,每个map的数据量为1G,因此要将10G数据写入到hdfs中。我配置的试验环境中只有2个工作节点,不过我希望每个工作节点只有1个map任务。
  test.randomwrite.bytes_per_map我原本以为是随机写输出的测试文件的大小,默认为1G=1*1024*1024*1024,但是我将这个数据改成1*1024*1024以后,输出的测试文件还是1G,这让我很不解。(PS:2011-11-2,今天知道这个参数表示没个map任务产生的数据量,如果将其改为1*1024*1024,那么就表示没个map任务产生的数据量为1MB。)(PS:2011-11-3,修改参数test.randomwrite.bytes_per_map并不能更改每个map任务产生的数据量,还是1G,不管我将这个参数设定为什么值。不过修改参数:test.randomwriter.maps_per_host是有效的。测试发现将该参数设为1和2都测试通过。问题:在哪里修改test.randomwrite.bytes_per_map才能真正修改map任务产生的数据量。!
2.代码实例
  其中test.randomwrite.bytes_per_map=1*1024*1024,test.randomwriter.maps_per_host=1。
  
/*** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements.  See the NOTICE file* distributed with this work for additional information* regarding copyright ownership.  The ASF licenses this file* to you under the Apache License, Version 2.0 (the* &quot;License&quot;); you may not use this file except in compliance* with the License.  You may obtain a copy of the License at**     http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an &quot;AS IS&quot; BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.apache.hadoop.examples;import java.io.IOException;import java.util.Date;import java.util.Random;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.BytesWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.Writable;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.mapred.ClusterStatus;import org.apache.hadoop.mapred.FileOutputFormat;import org.apache.hadoop.mapred.FileSplit;import org.apache.hadoop.mapred.InputFormat;import org.apache.hadoop.mapred.InputSplit;import org.apache.hadoop.mapred.JobClient;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.mapred.MapReduceBase;import org.apache.hadoop.mapred.Mapper;import org.apache.hadoop.mapred.OutputCollector;import org.apache.hadoop.mapred.RecordReader;import org.apache.hadoop.mapred.Reporter;import org.apache.hadoop.mapred.SequenceFileOutputFormat;import org.apache.hadoop.mapred.lib.IdentityReducer;import org.apache.hadoop.util.GenericOptionsParser;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;/*** This program uses map/reduce to just run a distributed job where there is* no interaction between the tasks and each task write a large unsorted* random binary sequence file of BytesWritable.* In order for this program to generate data for terasort with 10-byte keys* and 90-byte values, have the following config:* * * * *   *     test.randomwrite.min_key*     10*   *   *     test.randomwrite.max_key*     10*   *   *     test.randomwrite.min_value*     90*   *   *     test.randomwrite.max_value*     90*   *   *     test.randomwrite.total_bytes*     1099511627776*   * * * Equivalently, {@link RandomWriter} also supports all the above options* and ones supported by {@link GenericOptionsParser} via the command-line.*/public class RandomWriter extends Configured implements Tool {/*** User counters*/static enum Counters { RECORDS_WRITTEN, BYTES_WRITTEN }/*** A custom input format that creates virtual inputs of a single string* for each map.*/static class RandomInputFormat implements InputFormat {/** * Generate the requested number of file splits, with the filename* set to the filename of the output file.*/public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {InputSplit[] result = new InputSplit[numSplits];Path outDir = FileOutputFormat.getOutputPath(job);for(int i=0; i < result.length; ++i) {result = new FileSplit(new Path(outDir, &quot;dummy-split-&quot; + i), 0, 1, (String[])null);}return result;}/*** Return a single record (filename, &quot;&quot;) where the filename is taken from* the file split.*/static class RandomRecordReader implements RecordReader {Path name;public RandomRecordReader(Path p) {name = p;}public boolean next(Text key, Text value) {if (name != null) {key.set(name.getName());name = null;return true;}return false;}public Text createKey() {return new Text();}public Text createValue() {return new Text();}public long getPos() {return 0;}public void close() {}public float getProgress() {return 0.0f;}}public RecordReader getRecordReader(InputSplit split,JobConf job, Reporter reporter) throws IOException {return new RandomRecordReader(((FileSplit) split).getPath());}}static class Map extends MapReduceBaseimplements Mapper {private long numBytesToWrite;private int minKeySize;private int keySizeRange;private int minValueSize;private int valueSizeRange;private Random random = new Random();private BytesWritable randomKey = new BytesWritable();private BytesWritable randomValue = new BytesWritable();private void randomizeBytes(byte[] data, int offset, int length) {for(int i=offset + length - 1; i >= offset; --i) {data = (byte) random.nextInt(256);}}/*** Given an output filename, write a bunch of random records to it.*/public void map(WritableComparable key, Writable value,OutputCollector output, Reporter reporter) throws IOException {int itemCount = 0;while (numBytesToWrite > 0) {int keyLength = minKeySize + (keySizeRange != 0 ? random.nextInt(keySizeRange) : 0);randomKey.setSize(keyLength);randomizeBytes(randomKey.getBytes(), 0, randomKey.getLength());int valueLength = minValueSize +(valueSizeRange != 0 ? random.nextInt(valueSizeRange) : 0);randomValue.setSize(valueLength);randomizeBytes(randomValue.getBytes(), 0, randomValue.getLength());output.collect(randomKey, randomValue);numBytesToWrite -= keyLength + valueLength;reporter.incrCounter(Counters.BYTES_WRITTEN, keyLength + valueLength);reporter.incrCounter(Counters.RECORDS_WRITTEN, 1);if (++itemCount % 200 == 0) {reporter.setStatus(&quot;wrote record &quot; + itemCount + &quot;. &quot; + numBytesToWrite + &quot; bytes left.&quot;);}}reporter.setStatus(&quot;done with &quot; + itemCount + &quot; records.&quot;);}/*** Save the values out of the configuaration that we need to write* the data.*/@Overridepublic void configure(JobConf job) {numBytesToWrite = job.getLong(&quot;test.randomwrite.bytes_per_map&quot;,1*1024*1024);minKeySize = job.getInt(&quot;test.randomwrite.min_key&quot;, 10);keySizeRange = job.getInt(&quot;test.randomwrite.max_key&quot;, 1000) - minKeySize;minValueSize = job.getInt(&quot;test.randomwrite.min_value&quot;, 0);valueSizeRange = job.getInt(&quot;test.randomwrite.max_value&quot;, 20000) - minValueSize;}}/*** This is the main routine for launching a distributed random write job.* It runs 10 maps/node and each node writes 1 gig of data to a DFS file.* The reduce doesn't do anything.* * @throws IOException */public int run(String[] args) throws Exception {    if (args.length == 0) {System.out.println(&quot;Usage: writer &quot;);ToolRunner.printGenericCommandUsage(System.out);return -1;}Path outDir = new Path(args[0]);JobConf job = new JobConf(getConf());job.setJarByClass(RandomWriter.class);job.setJobName(&quot;random-writer&quot;);FileOutputFormat.setOutputPath(job, outDir);job.setOutputKeyClass(BytesWritable.class);job.setOutputValueClass(BytesWritable.class);job.setInputFormat(RandomInputFormat.class);job.setMapperClass(Map.class);        job.setReducerClass(IdentityReducer.class);job.setOutputFormat(SequenceFileOutputFormat.class);JobClient client = new JobClient(job);ClusterStatus cluster = client.getClusterStatus();int numMapsPerHost = job.getInt(&quot;test.randomwriter.maps_per_host&quot;, 1);long numBytesToWritePerMap = job.getLong(&quot;test.randomwrite.bytes_per_map&quot;,1*1024*1024);if (numBytesToWritePerMap == 0) {System.err.println(&quot;Cannot have test.randomwrite.bytes_per_map set to 0&quot;);return -2;}long totalBytesToWrite = job.getLong(&quot;test.randomwrite.total_bytes&quot;, numMapsPerHost*numBytesToWritePerMap*cluster.getTaskTrackers());int numMaps = (int) (totalBytesToWrite / numBytesToWritePerMap);if (numMaps == 0 && totalBytesToWrite > 0) {numMaps = 1;job.setLong(&quot;test.randomwrite.bytes_per_map&quot;, totalBytesToWrite);}job.setNumMapTasks(numMaps);System.out.println(&quot;Running &quot; + numMaps + &quot; maps.&quot;);// reducer NONEjob.setNumReduceTasks(0);Date startTime = new Date();System.out.println(&quot;Job started: &quot; + startTime);JobClient.runJob(job);Date endTime = new Date();System.out.println(&quot;Job ended: &quot; + endTime);System.out.println(&quot;The job took &quot; + (endTime.getTime() - startTime.getTime()) /1000 + &quot; seconds.&quot;);return 0;}public static void main(String[] args) throws Exception {int res = ToolRunner.run(new Configuration(), new RandomWriter(), args);System.exit(res);}}输出信息:  
  
11/10/17 13:27:46 WARN conf.Configuration: DEPRECATED: hadoop-site.xml found in the classpath. Usage of hadoop-site.xml is deprecated. Instead use core-site.xml, mapred-site.xml and hdfs-site.xml to override properties of core-default.xml, mapred-default.xml and hdfs-default.xml respectivelyRunning 2 maps.Job started: Mon Oct 17 13:27:47 CST 201111/10/17 13:27:47 INFO mapred.JobClient: Running job: job_201110171322_000111/10/17 13:27:48 INFO mapred.JobClient:  map 0% reduce 0%11/10/17 13:29:58 INFO mapred.JobClient:  map 50% reduce 0%11/10/17 13:30:05 INFO mapred.JobClient:  map 100% reduce 0%11/10/17 13:30:07 INFO mapred.JobClient: Job complete: job_201110171322_000111/10/17 13:30:07 INFO mapred.JobClient: Counters: 811/10/17 13:30:07 INFO mapred.JobClient:   Job Counters 11/10/17 13:30:07 INFO mapred.JobClient:     Launched map tasks=311/10/17 13:30:07 INFO mapred.JobClient:   org.apache.hadoop.examples.RandomWriter$Counters11/10/17 13:30:07 INFO mapred.JobClient:     BYTES_WRITTEN=214750407811/10/17 13:30:07 INFO mapred.JobClient:     RECORDS_WRITTEN=20452811/10/17 13:30:07 INFO mapred.JobClient:   FileSystemCounters11/10/17 13:30:07 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=215458031811/10/17 13:30:07 INFO mapred.JobClient:   Map-Reduce Framework11/10/17 13:30:07 INFO mapred.JobClient:     Map input records=211/10/17 13:30:07 INFO mapred.JobClient:     Spilled Records=011/10/17 13:30:07 INFO mapred.JobClient:     Map input bytes=011/10/17 13:30:07 INFO mapred.JobClient:     Map output records=204528Job ended: Mon Oct 17 13:30:07 CST 2011The job took 140 seconds.在hdfs上产生了两个文件,在/home/hadoop/rand目录下,分别是part-00000(1Gb,r3)和part-00001(1Gb,r3)


  

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-85496-1-1.html 上篇帖子: Hadoop学习笔记(一)之示例程序:计算每年的最高温度MaxTemperature 下篇帖子: 再见了Hadoop MapReduce
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表