/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.flume.sink.hbase;
import java.util.ArrayList;
import java.util.List;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.FlumeException;
import org.hbase.async.AtomicIncrementRequest;
import org.hbase.async.PutRequest;
import org.apache.flume.conf.ComponentConfiguration;
import org.apache.flume.sink.hbase.SimpleHbaseEventSerializer.KeyType;
import com.google.common.base.Charsets;
public class BaimiAsyncHbaseEventSerializer implements AsyncHbaseEventSerializer {
private byte[] table;
private byte[] cf;
private byte[][] payload;
private byte[][] payloadColumn;
private final String payloadColumnSplit = "\\^A";
private byte[] incrementColumn;
private String rowSuffix;
private String rowSuffixCol;
private byte[] incrementRow;
private KeyType keyType;
@Override
public void initialize(byte[] table, byte[] cf) {
this.table = table;
this.cf = cf;
}
@Override
public List<PutRequest> getActions() {
List<PutRequest> actions = new ArrayList<PutRequest>();
if(payloadColumn != null){
byte[] rowKey;
try {
switch (keyType) {
case TS:
rowKey = SimpleRowKeyGenerator.getTimestampKey(rowSuffix);
break;
case TSNANO:
rowKey = SimpleRowKeyGenerator.getNanoTimestampKey(rowSuffix);
break;
case RANDOM:
rowKey = SimpleRowKeyGenerator.getRandomKey(rowSuffix);
break;
default:
rowKey = SimpleRowKeyGenerator.getUUIDKey(rowSuffix);
break;
}
// for 循环,提交所有列和对于数据的put请求。
for (int i = 0; i < this.payload.length; i++)
{
PutRequest putRequest = new PutRequest(table, rowKey, cf,payloadColumn, payload);
actions.add(putRequest);
}
} catch (Exception e){
throw new FlumeException("Could not get row key!", e);
}
}
return actions;
}
public List<AtomicIncrementRequest> getIncrements(){
List<AtomicIncrementRequest> actions = new
ArrayList<AtomicIncrementRequest>();
if(incrementColumn != null) {
AtomicIncrementRequest inc = new AtomicIncrementRequest(table,
incrementRow, cf, incrementColumn);
actions.add(inc);
}
return actions;
}
@Override
public void cleanUp() {
// TODO Auto-generated method stub
}
@Override
public void configure(Context context) {
String pCol = context.getString("payloadColumn", "pCol");
String iCol = context.getString("incrementColumn", "iCol");
rowSuffixCol = context.getString("rowPrefixCol", "mac");
String suffix = context.getString("suffix", "uuid");
if(pCol != null && !pCol.isEmpty()) {
if(suffix.equals("timestamp")){
keyType = KeyType.TS;
} else if (suffix.equals("random")) {
keyType = KeyType.RANDOM;
} else if(suffix.equals("nano")){
keyType = KeyType.TSNANO;
} else {
keyType = KeyType.UUID;
}
// 从配置文件中读出column。
String[] pCols = pCol.replace(" ", "").split(",");
payloadColumn = new byte[pCols.length][];
for (int i = 0; i < pCols.length; i++)
{
// 列名转为小写
payloadColumn = pCols.toLowerCase().getBytes(Charsets.UTF_8);
}
}
if(iCol != null && !iCol.isEmpty()) {
incrementColumn = iCol.getBytes(Charsets.UTF_8);
}
incrementRow =
context.getString("incrementRow", "incRow").getBytes(Charsets.UTF_8);
}
@Override
public void setEvent(Event event) {
String strBody = new String(event.getBody());
String[] subBody = strBody.split(this.payloadColumnSplit);
if (subBody.length == this.payloadColumn.length)
{
this.payload = new byte[subBody.length][];
for (int i = 0; i < subBody.length; i++)
{
this.payload = subBody.getBytes(Charsets.UTF_8);
if ((new String(this.payloadColumn).equals(this.rowSuffixCol)))
{
// rowkey 前缀是某一列的值, 默认情况是mac地址
this.rowSuffix = subBody;
}
}
}
}
@Override
public void configure(ComponentConfiguration conf) {
// TODO Auto-generated method stub
}
}
重点可以查看setEent,configure,getActions函数。