本文采用hadoop1.0.3和Cassandra1.2.0,以Cassandra源码给的WordCount为例说明。关于hadoop与cassandra的相关知识就不废话了,直接进入主题。
Cassandra与Hadoop整合先从cassandra读取数据,进行Map/Reduce,将结果输出到HDFS或Cassandra。
从Cassandra读取数据这个过程需要将Cassandra数据转化成Hadoop的InputSplit。此过程是Hadoop在 JobTracker.submitJob()后调用ColumnFamilyInputFormat的getSplits(JobContextcontext)方法完成的。getSplits先获取所配置服务器的TokenRange列表。TokenRange列表与 cassandra.yaml中num_tokens配置有关,TokenRange数=节点数*num_tokens,即有4节点的集群,每个节点的num_tokens=256,那TokenRange数有4*256=1024个;随后调用
SplitCallable.getSubSplits(String keyspace, String cfName,TokenRange range, Configuration conf)将TokenRange分成更小的CfSplit;CfSplit的大小跟配置"cassandra.input.split.size"和现有cassandra数据有关,如果某个TokenRange没有数据,只返回1个CfSplit,如果有数据,根据"cassandra.input.split.size"和现有cassandra数据(具体算法暂没研究)返回CfSplit的个数。再将
CfSplit转成ColumnFamilySplit,ColumnFamilySplit是InputSplit的子类。Cassandra与Hadoop数据对应关系如下图。
另外,从Cassandra读取数据可以带条件,可以设置相应的SlicePredicate和IndexExpression。
- SlicePredicate:当我们查询某一个Key下面的Value的时候,可以通过SlicePredicate来指定需要返回哪些Column。如果希望返回某几个具体的Column,那么指定column_names字段的值就可以了。如果希望按照某一个规则返回相应的Column,那么指定slice_range字段就可以了。
- IndexExpression:使用它对2级索引做EQ,GTE,GT,LTE,LT查询。
SlicePredicate predicate = new SlicePredicate()
.setColumn_names(Arrays.asList(ByteBufferUtil.bytes(columnName)));
ConfigHelper.setInputSlicePredicate(job.getConfiguration(),predicate);
if (i == 4) {
IndexExpression expr = new IndexExpression(
ByteBufferUtil.bytes("int4"), IndexOperator.EQ,
ByteBufferUtil.bytes(0));
ConfigHelper.setInputRange(job.getConfiguration(),Arrays.asList(expr));
}
好了,Cassandra数据已经Split了,接下来是Map过程MapTask从InputSplit读取数据。MapTask先生成一个RecordReader来读取数据,这个RecordReader是调用ColumnFamilyInputFormat.getRecordReader方法来生成的,也就是ColumnFamilyRecordReader。在getRecordReader方法中初始化了Cassanra连接、查询此InputSplit(ColumnFamilySplit)的查询条件及一次查询数据大小、数据迭代器等信息。在读取数据前MapTask会生成一个MapRunnable,而MapRunnable要完成的任务就时通过RecordReader的nextKeyValue函数从split循环读取中读取<k,v>交给map函数进行处理。接下来就是MapReduce处理了。
下面拿WordCount代码做简单介绍:
Map:对单词计数
public static class TokenizerMapper extends
Mapper<ByteBuffer, SortedMap<ByteBuffer, IColumn>, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
private ByteBuffer sourceColumn;
protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context)
throws IOException, InterruptedException {
}
public void map(ByteBuffer key, SortedMap<ByteBuffer, IColumn> columns,
Context context) throws IOException, InterruptedException {
for (IColumn column : columns.values()) {
String name = ByteBufferUtil.string(column.name());
String value = null;
if (name.contains("int"))
value = String.valueOf(ByteBufferUtil.toInt(column.value()));
else
value = ByteBufferUtil.string(column.value());
logger.debug("read {}:{}={} from {}",
new Object[] { ByteBufferUtil.string(key), name, value,
context.getInputSplit() });
StringTokenizer itr = new StringTokenizer(value);
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
}Reduce到HDFS:
public static class ReducerToFilesystem extends
Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values)
sum += val.get();
context.write(key, new IntWritable(sum));
}
}
Reduce到Cassandra:
public static class ReducerToCassandra extends
Reducer<Text, IntWritable, ByteBuffer, List<Mutation>> {
private ByteBuffer outputKey;
protected void setup(org.apache.hadoop.mapreduce.Reducer.Context context)
throws IOException, InterruptedException {
outputKey = ByteBufferUtil.bytes(context.getConfiguration().get(
CONF_COLUMN_NAME));
}
public void reduce(Text word, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values)
sum += val.get();
context.write(outputKey,
Collections.singletonList(getMutation(word, sum)));
}
private static Mutation getMutation(Text word, int sum) {
Column c = new Column();
c.setName(Arrays.copyOf(word.getBytes(), word.getLength()));
c.setValue(ByteBufferUtil.bytes(sum));
c.setTimestamp(System.currentTimeMillis());
Mutation m = new Mutation();
m.setColumn_or_supercolumn(new ColumnOrSuperColumn());
m.column_or_supercolumn.setColumn(c);
return m;
}
} Reduce到Cassandra是通过ColumnFamilyOutputFormat类。ReduceTask调用ColumnFamilyOutputFormat.getRecordWriter(final TaskAttemptContext context)获得一个ColumnFamilyRecordWriter,然后通过RangeClient写入到cassandra。
参考资料:hadoop_integration,MapReduce流程分析,《Cassandra实战》
版权声明:本文为博主原创文章,未经博主允许不得转载。 |