|
数据源格式如下:
120130512 1 -1 -1 13802 1 2013-05-12 07:26:22220130512 1 -1 -1 13802 1 2013-05-12 11:18:24我们期待的结果是数据直接从 hdfs 读取后 写入 hbase,没有 reduce 阶段, 代码如下:
01package WebsiteAnalysis;02 03import java.io.IOException;04 05import org.apache.hadoop.conf.Configuration;06import org.apache.hadoop.fs.Path;07import org.apache.hadoop.hbase.HBaseConfiguration;08import org.apache.hadoop.hbase.KeyValue;09import org.apache.hadoop.hbase.client.Put;10import org.apache.hadoop.hbase.io.ImmutableBytesWritable;11import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;12import org.apache.hadoop.hbase.util.Bytes;13import org.apache.hadoop.io.LongWritable;14import org.apache.hadoop.io.Text;15import org.apache.hadoop.io.Writable;16import org.apache.hadoop.mapreduce.Job;17import org.apache.hadoop.mapreduce.Mapper;18import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;19import org.apache.hadoop.util.GenericOptionsParser;20 21public class Map2Hdfs{22 public static final StringNAME = "ImportFromFile";23 24 public enum Counters{25 LINES26 }27 28 static class ImportMapper extends Mapper<LongWritable,Text, ImmutableBytesWritable, Writable> {29 private byte[]family = null;30 private byte[]qualifier = null;31 32 @Override33 protected void setup(Contextcontext) throws IOException,InterruptedException {34 Stringcolumn = context.getConfiguration().get("conf.column");35 byte[][]colkey = KeyValue.parseColumn(Bytes.toBytes(column));36 family= colkey[0];37 if (colkey.length> 1){38 qualifier= colkey[1];39 }40 }41 42 @Override43 public void map(LongWritableoffset, Text line, Context context) throws IOException{44 try {45 String[]lineArr = line.toString().split("\t");46 Putput = new Put(Bytes.toBytes(offset+ ""));47 put.add(family,Bytes.toBytes("time"),Bytes.toBytes(lineArr[lineArr.length - 1]));48 context.write(new ImmutableBytesWritable(Bytes.toBytes(offset+ "")),put);49 context.getCounter(Counters.LINES).increment(1);50 } catch (Exceptione) {51 e.printStackTrace();52 }53 }54 }55 56 public static void main(String[]args) throws Exception{57 Configurationconf = HBaseConfiguration.create();58 String[]otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();59 conf.set("conf.column", "cf");60 StringinputPath = "/dsap/middata/lj/ooxx/pv";61 Jobjob = new Job(conf, "TestMap2Hdfs");62 63 job.setJarByClass(Map2Hdfs.class);64 job.setMapperClass(ImportMapper.class);65 job.setOutputFormatClass(TableOutputFormat.class);66 job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, "TestMap2Hdfs");67 job.setOutputKeyClass(ImmutableBytesWritable.class);68 job.setOutputValueClass(Writable.class);69 job.setNumReduceTasks(0);70 FileInputFormat.addInputPath(job, new Path(inputPath+ "/" +otherArgs[0]));71 System.exit(job.waitForCompletion(true)? 0 : 1);72 }73}
REF:
http://stackoverflow.com/questions/11061854/hadoop-writing-to-hbase-directly-from-the-mapper
http://blog.sina.com.cn/s/blog_62a9902f0101904h.html 新建表的方式写入
hbase-hdfs MapReduce 数据读写总结
http://blog.pureisle.net/archives/1938.html hbase hdfs MR 读写的几种情况总结
http://blog.iyunv.com/kirayuan/article/details/7001278 hbase表拷贝样例代码 |
|
|