设为首页 收藏本站
查看: 1109|回复: 0

Cassandra与Hadoop整合

[复制链接]
累计签到:1 天
连续签到:1 天
发表于 2015-11-11 13:42:14 | 显示全部楼层 |阅读模式

       本文采用hadoop1.0.3和Cassandra1.2.0,以Cassandra源码给的WordCount为例说明。关于hadoop与cassandra的相关知识就不废话了,直接进入主题。

       Cassandra与Hadoop整合先从cassandra读取数据,进行Map/Reduce,将结果输出到HDFS或Cassandra。


       从Cassandra读取数据这个过程需要将Cassandra数据转化成Hadoop的InputSplit。此过程是Hadoop在 JobTracker.submitJob()后调用ColumnFamilyInputFormat的getSplits(JobContextcontext)方法完成的。getSplits先获取所配置服务器的TokenRange列表。TokenRange列表与 cassandra.yaml中num_tokens配置有关,TokenRange数=节点数*num_tokens,即有4节点的集群,每个节点的num_tokens=256,那TokenRange数有4*256=1024个;随后调用
SplitCallable.getSubSplits(String keyspace, String cfName,TokenRange range, Configuration conf)将TokenRange分成更小的CfSplit;CfSplit的大小跟配置"cassandra.input.split.size"和现有cassandra数据有关,如果某个TokenRange没有数据,只返回1个CfSplit,如果有数据,根据"cassandra.input.split.size"和现有cassandra数据(具体算法暂没研究)返回CfSplit的个数。再将
CfSplit转成ColumnFamilySplit,ColumnFamilySplit是InputSplit的子类。Cassandra与Hadoop数据对应关系如下图。
  
   DSC0000.jpg
      另外,从Cassandra读取数据可以带条件,可以设置相应的SlicePredicate和IndexExpression。


  • SlicePredicate:当我们查询某一个Key下面的Value的时候,可以通过SlicePredicate来指定需要返回哪些Column。如果希望返回某几个具体的Column,那么指定column_names字段的值就可以了。如果希望按照某一个规则返回相应的Column,那么指定slice_range字段就可以了。
  • IndexExpression:使用它对2级索引做EQ,GTE,GT,LTE,LT查询
  SlicePredicate predicate = new SlicePredicate()
.setColumn_names(Arrays.asList(ByteBufferUtil.bytes(columnName)));
ConfigHelper.setInputSlicePredicate(job.getConfiguration(),predicate);
if (i == 4) {
IndexExpression expr = new IndexExpression(
ByteBufferUtil.bytes("int4"), IndexOperator.EQ,
ByteBufferUtil.bytes(0));
ConfigHelper.setInputRange(job.getConfiguration(),Arrays.asList(expr));
}


         好了,Cassandra数据已经Split了,接下来是Map过程MapTask从InputSplit读取数据。MapTask先生成一个RecordReader来读取数据,这个RecordReader是调用ColumnFamilyInputFormat.getRecordReader方法来生成的,也就是ColumnFamilyRecordReader。在getRecordReader方法中初始化了Cassanra连接、查询此InputSplit(ColumnFamilySplit)的查询条件及一次查询数据大小、数据迭代器等信息。在读取数据前MapTask会生成一个MapRunnable,而MapRunnable要完成的任务就时通过RecordReader的nextKeyValue函数从split循环读取中读取<k,v>交给map函数进行处理。接下来就是MapReduce处理了。
  下面拿WordCount代码做简单介绍:
  Map:对单词计数
  public static class TokenizerMapper extends
Mapper<ByteBuffer, SortedMap<ByteBuffer, IColumn>, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
private ByteBuffer sourceColumn;
protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context)
throws IOException, InterruptedException {
}
public void map(ByteBuffer key, SortedMap<ByteBuffer, IColumn> columns,
Context context) throws IOException, InterruptedException {
for (IColumn column : columns.values()) {
String name = ByteBufferUtil.string(column.name());
String value = null;
if (name.contains(&quot;int&quot;))
value = String.valueOf(ByteBufferUtil.toInt(column.value()));
else
value = ByteBufferUtil.string(column.value());
logger.debug(&quot;read {}:{}={} from {}&quot;,
new Object[] { ByteBufferUtil.string(key), name, value,
context.getInputSplit() });
StringTokenizer itr = new StringTokenizer(value);
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
}Reduce到HDFS:
  public static class ReducerToFilesystem extends
Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values)
sum += val.get();
context.write(key, new IntWritable(sum));
}
}

Reduce到Cassandra:


  public static class ReducerToCassandra extends
Reducer<Text, IntWritable, ByteBuffer, List<Mutation>> {
private ByteBuffer outputKey;
protected void setup(org.apache.hadoop.mapreduce.Reducer.Context context)
throws IOException, InterruptedException {
outputKey = ByteBufferUtil.bytes(context.getConfiguration().get(
CONF_COLUMN_NAME));
}
public void reduce(Text word, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values)
sum += val.get();
context.write(outputKey,
Collections.singletonList(getMutation(word, sum)));
}
private static Mutation getMutation(Text word, int sum) {
Column c = new Column();
c.setName(Arrays.copyOf(word.getBytes(), word.getLength()));
c.setValue(ByteBufferUtil.bytes(sum));
c.setTimestamp(System.currentTimeMillis());
Mutation m = new Mutation();
m.setColumn_or_supercolumn(new ColumnOrSuperColumn());
m.column_or_supercolumn.setColumn(c);
return m;
}
}       Reduce到Cassandra是通过ColumnFamilyOutputFormat类。ReduceTask调用ColumnFamilyOutputFormat.getRecordWriter(final TaskAttemptContext context)获得一个ColumnFamilyRecordWriter,然后通过RangeClient写入到cassandra。
  


  参考资料:hadoop_integration,MapReduce流程分析,《Cassandra实战》



版权声明:本文为博主原创文章,未经博主允许不得转载。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-137938-1-1.html 上篇帖子: Hadoop2.2.0单节点安装及测试 下篇帖子: Hadoop MapReduce 编写例子
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表