设为首页 收藏本站
查看: 1534|回复: 0

[经验分享] flume 自定义 hbase sink 类

[复制链接]

尚未签到

发表于 2015-11-27 20:34:01 | 显示全部楼层 |阅读模式
  

参考(向原作者致敬)

  • http://ydt619.blog.iyunv.com/316163/1230586


  • https://blogs.apache.org/flume/entry/streaming_data_into_apache_hbase

flume 1.5 的配置文件示例

#Name the  components on this agent
a1.sources  = r1
a1.sinks =  k1
a1.channels  = c1
#  Describe/configure the source
a1.sources.r1.type  = spooldir
a1.sources.r1.spoolDir  = /home/scut/Downloads/testFlume
# Describe  the sink
a1.sinks.k1.type  = org.apache.flume.sink.hbase.AsyncHBaseSink
a1.sinks.k1.table = Router #设置hbase的表名
a1.sinks.k1.columnFamily = log #设置hbase中的columnFamily
a1.sinks.k1.serializer.payloadColumn=serviceTime,browerOS,clientTime,screenHeight,screenWidth,url,userAgent,mobileDevice,gwId,mac # 设置hbase的column
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.BaimiAsyncHbaseEventSerializer # 设置serializer的处理类
# Use a  channel which buffers events in memory
a1.channels.c1.type  = memory
a1.channels.c1.capacity  = 1000
a1.channels.c1.transactionCapacity  = 100
# Bind the  source and sink to the channel
a1.sources.r1.channels  = c1
a1.sinks.k1.channel  = c1
重点说明几个属性

  • a1.sinks.k1.serializer.payloadColumn 中列出了所有的列名。
  • a1.sinks.k1.serializer设置了flume serializer的处理类。BaimiAsyncHbaseEventSerializer类中会获取payloadColumn的内容,将它以逗号分隔,从而得出所有的列名。



BaimiAsyncHbaseEventSerializer类

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied.  See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.flume.sink.hbase;
import java.util.ArrayList;
import java.util.List;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.FlumeException;
import org.hbase.async.AtomicIncrementRequest;
import org.hbase.async.PutRequest;
import org.apache.flume.conf.ComponentConfiguration;
import org.apache.flume.sink.hbase.SimpleHbaseEventSerializer.KeyType;
import com.google.common.base.Charsets;
public class BaimiAsyncHbaseEventSerializer implements AsyncHbaseEventSerializer {
private byte[] table;
private byte[] cf;
private byte[][] payload;
private byte[][] payloadColumn;
private final String payloadColumnSplit = "\\^A";
private byte[] incrementColumn;
private String rowSuffix;
private String rowSuffixCol;
private byte[] incrementRow;
private KeyType keyType;
@Override
public void initialize(byte[] table, byte[] cf) {
this.table = table;
this.cf = cf;
}
@Override
public List<PutRequest> getActions() {
List<PutRequest> actions = new ArrayList<PutRequest>();
if(payloadColumn != null){
byte[] rowKey;
try {
switch (keyType) {
case TS:
rowKey = SimpleRowKeyGenerator.getTimestampKey(rowSuffix);
break;
case TSNANO:
rowKey = SimpleRowKeyGenerator.getNanoTimestampKey(rowSuffix);
break;
case RANDOM:
rowKey = SimpleRowKeyGenerator.getRandomKey(rowSuffix);
break;
default:
rowKey = SimpleRowKeyGenerator.getUUIDKey(rowSuffix);
break;
}
// for 循环,提交所有列和对于数据的put请求。
for (int i = 0; i < this.payload.length; i++)
{
PutRequest putRequest =  new PutRequest(table, rowKey, cf,payloadColumn, payload);
actions.add(putRequest);
}
} catch (Exception e){
throw new FlumeException(&quot;Could not get row key!&quot;, e);
}
}
return actions;
}
public List<AtomicIncrementRequest> getIncrements(){
List<AtomicIncrementRequest> actions = new
ArrayList<AtomicIncrementRequest>();
if(incrementColumn != null) {
AtomicIncrementRequest inc = new AtomicIncrementRequest(table,
incrementRow, cf, incrementColumn);
actions.add(inc);
}
return actions;
}
@Override
public void cleanUp() {
// TODO Auto-generated method stub
}
@Override
public void configure(Context context) {
String pCol = context.getString(&quot;payloadColumn&quot;, &quot;pCol&quot;);
String iCol = context.getString(&quot;incrementColumn&quot;, &quot;iCol&quot;);
rowSuffixCol = context.getString(&quot;rowPrefixCol&quot;, &quot;mac&quot;);
String suffix = context.getString(&quot;suffix&quot;, &quot;uuid&quot;);
if(pCol != null && !pCol.isEmpty()) {
if(suffix.equals(&quot;timestamp&quot;)){
keyType = KeyType.TS;
} else if (suffix.equals(&quot;random&quot;)) {
keyType = KeyType.RANDOM;
} else if(suffix.equals(&quot;nano&quot;)){
keyType = KeyType.TSNANO;
} else {
keyType = KeyType.UUID;
}
// 从配置文件中读出column。
String[] pCols = pCol.replace(&quot; &quot;, &quot;&quot;).split(&quot;,&quot;);
payloadColumn = new byte[pCols.length][];
for (int i = 0; i < pCols.length; i++)
{
// 列名转为小写
payloadColumn = pCols.toLowerCase().getBytes(Charsets.UTF_8);
}
}
if(iCol != null && !iCol.isEmpty()) {
incrementColumn = iCol.getBytes(Charsets.UTF_8);
}
incrementRow =
context.getString(&quot;incrementRow&quot;, &quot;incRow&quot;).getBytes(Charsets.UTF_8);
}
@Override
public void setEvent(Event event) {
String strBody = new String(event.getBody());
String[] subBody = strBody.split(this.payloadColumnSplit);
if (subBody.length == this.payloadColumn.length)
{
this.payload = new byte[subBody.length][];
for (int i = 0; i < subBody.length; i++)
{
this.payload = subBody.getBytes(Charsets.UTF_8);
if ((new String(this.payloadColumn).equals(this.rowSuffixCol)))
{
// rowkey 前缀是某一列的值, 默认情况是mac地址
this.rowSuffix = subBody;
}
}
}
}
@Override
public void configure(ComponentConfiguration conf) {
// TODO Auto-generated method stub
}
}
重点可以查看setEent,configure,getActions函数。

  • configure函数:读取flume配置文件内容,包括列名,rowkey后缀等信息
  • setEvent函数:获取flume event 内容,将其保存到payload数组中。
  • getActions函数:创建PutRequest实例,将rowkey,columnfamily,column,value等信息写入putrequest实例中。

源码编译和执行

     编写好自定义的BaimiAsyncHbaseEventSerializer函数后,接下来需要编译源码,生成flume-ng-hbase-sink.*.jar包,替换flume中原来的flume-ng-hbase-sink.*.jar包。

  • 下载flume 1.5 源码,解压后进入目录flume-1.5.0-src/flume-ng-sinks/flume-ng-hbase-sinks/src/main/java/org/apache/flume/sink/hbase/
  • 复制上面的BaimiAsyncHbaseEventSerializer类到上面的目录中。
  • 进入flume-1.5.0-src/flume-ng-sinks/flume-ng-hbase-sinks/,运行mvn编译命令【mvn install -Dmaven.test.skip=true
  • mvn编译后会在flume-1.5.0-src/flume-ng-sinks/flume-ng-hbase-sinks/target目录下生成flume-ng-hbase-sink-1.5.0.jar,将这个jar包替换$FLUME_HOME/lib下的jar包
  • 运行flume执行命令【flume-ng agent -c . -f conf/spoolDir.conf -n a1  -Dflume.root.logger=INFO,console








  

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-144373-1-1.html 上篇帖子: 用hadoop2.2做flume1.4的sink,该怎么好呢? 下篇帖子: flume 1.4的介绍及使用示例
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表