zcl_ccc 发表于 2015-11-11 14:45:47

Hadoop Mapper 阶段将数据直接从 HDFS 导入 Hbase

  数据源格式如下:
120130512    1   -1-113802   1   2013-05-12 07:26:22220130512    1   -1-113802   1   2013-05-12 11:18:24我们期待的结果是数据直接从 hdfs 读取后 写入 hbase,没有 reduce 阶段,  代码如下:
01package WebsiteAnalysis;02 03import java.io.IOException;04 05import org.apache.hadoop.conf.Configuration;06import org.apache.hadoop.fs.Path;07import org.apache.hadoop.hbase.HBaseConfiguration;08import org.apache.hadoop.hbase.KeyValue;09import org.apache.hadoop.hbase.client.Put;10import org.apache.hadoop.hbase.io.ImmutableBytesWritable;11import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;12import org.apache.hadoop.hbase.util.Bytes;13import org.apache.hadoop.io.LongWritable;14import org.apache.hadoop.io.Text;15import org.apache.hadoop.io.Writable;16import org.apache.hadoop.mapreduce.Job;17import org.apache.hadoop.mapreduce.Mapper;18import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;19import org.apache.hadoop.util.GenericOptionsParser;20 21public class Map2Hdfs{22    public static final StringNAME = &quot;ImportFromFile&quot;;23 24    public enum Counters{25      LINES26    }27 28    static class ImportMapper extends Mapper<LongWritable,Text, ImmutableBytesWritable, Writable> {29      private byte[]family = null;30      private byte[]qualifier = null;31 32      @Override33      protected void setup(Contextcontext) throws IOException,InterruptedException {34            Stringcolumn = context.getConfiguration().get(&quot;conf.column&quot;);35            byte[][]colkey = KeyValue.parseColumn(Bytes.toBytes(column));36            family= colkey;37            if (colkey.length> 1){38                qualifier= colkey;39            }40      }41 42      @Override43      public void map(LongWritableoffset, Text line, Context context) throws IOException{44            try {45                String[]lineArr = line.toString().split(&quot;\t&quot;);46                Putput = new Put(Bytes.toBytes(offset&#43; &quot;&quot;));47                put.add(family,Bytes.toBytes(&quot;time&quot;),Bytes.toBytes(lineArr));48                context.write(new ImmutableBytesWritable(Bytes.toBytes(offset&#43; &quot;&quot;)),put);49                context.getCounter(Counters.LINES).increment(1);50            } catch (Exceptione) {51                e.printStackTrace();52            }53      }54    }55 56    public static void main(String[]args) throws Exception{57      Configurationconf = HBaseConfiguration.create();58      String[]otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();59      conf.set(&quot;conf.column&quot;, &quot;cf&quot;);60      StringinputPath = &quot;/dsap/middata/lj/ooxx/pv&quot;;61      Jobjob = new Job(conf, &quot;TestMap2Hdfs&quot;);62 63      job.setJarByClass(Map2Hdfs.class);64      job.setMapperClass(ImportMapper.class);65      job.setOutputFormatClass(TableOutputFormat.class);66      job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, &quot;TestMap2Hdfs&quot;);67      job.setOutputKeyClass(ImmutableBytesWritable.class);68      job.setOutputValueClass(Writable.class);69      job.setNumReduceTasks(0);70      FileInputFormat.addInputPath(job, new Path(inputPath&#43; &quot;/&quot; &#43;otherArgs));71      System.exit(job.waitForCompletion(true)? 0 : 1);72    }73}  

REF:  

  
  http://stackoverflow.com/questions/11061854/hadoop-writing-to-hbase-directly-from-the-mapper
  http://blog.sina.com.cn/s/blog_62a9902f0101904h.html新建表的方式写入
  hbase-hdfs MapReduce 数据读写总结
  http://blog.pureisle.net/archives/1938.htmlhbase hdfs MR 读写的几种情况总结
  http://blog.iyunv.com/kirayuan/article/details/7001278hbase表拷贝样例代码
页: [1]
查看完整版本: Hadoop Mapper 阶段将数据直接从 HDFS 导入 Hbase