mapper
public class OfferMapper extends TableMapper<Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
/**
* rowkey,value,context
*/
public void map(ImmutableBytesWritable key, Result value, Context context) throws IOException,
InterruptedException {
String titleValue = Bytes.toString(value.getValue(Bytes.toBytes("title"), null));
for(KeyValue kv : value.list()){
System.out.println(kv.getValue().toString());
word.set(kv.getValue());
context.write(word, one);
}
}
}
reducer:
public class OfferReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException,
InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public class OfferTask {
final static String NAME = "OfferTask";
/**
* Sets up the actual job.
*
* @param conf The current configuration.
* @param args The command line parameters.
* @return The newly created job.
* @throws IOException When setting up the job fails.
*/
public static Job createSubmittableJob(Configuration conf, String[] args) throws IOException {
String tableName = args[0];
// Path inputDir = new Path(args[1]);
Job job = new Job(conf, NAME + "_" + tableName);
job.setJarByClass(OfferTask.class);