|
一.概述
- 客户端创建包含过滤器Filter的Scan。
- Scan通过RPC被发送给RegionServer,在服务器端执行过滤操作。
- Scan的作用域是region,所以一个RegionServer有多个region的话,Scan将被发送到每个region。
二.Filter工作流程
你必须知道的是,HBase里的一行数据对应一或者多个KeyValue。再来看看流程。
- boolean fiterRowKey(byte[] buffer, int offset, int length):检查rowkey。返回true代表被过滤掉,false进入下个方法。
- ReturnCode fiterKeyValue(KeyValue v):检查rowkey下特定的某一个KeyValue。它有五种返回值,常见的ReturnCode.INCLUDE代表结果中包含这个KeyValue,Skip代表不包含,继续下处理一个KeyValue。
- void filterRow(List<KeyValue> ignored):ignored是2里面被过滤掉的KeyValue集合。
- boolean filterRow():返回true代表过滤掉当前行。
- void reset():迭代每一个新的RowKey(步骤1)前调用此方法。
- boolean filterAllRemaining():返回true时代表终止整个扫描操作。如用户找到了需要的所有数据,就在这里可以返回true。
三.例子
说这么多,不如一个实际的例子来的方便。
用户表rowkey为用户名称,cf:age代表年龄,cf:pw代表密码。现在需要找出用户,有三个限制条件
- 看到zzy这个用户直接pass,因为它是个技术狂人
。
- 找出密码长度小于4的用户,用户信息不需要包含pw列。
- 只找出满足上述条件的两个用户。
public class PasswordStrengthFilter extends FilterBase {
private int len;
private int limit;
private int rowsAccepted = 0;
private boolean filterRow = false;
public PasswordStrengthFilter() {
super();
}
public PasswordStrengthFilter(int len, int limit) {
this.len = len;
this.limit = limit;
}
@Override
public boolean filterRowKey(byte[] buffer, int offset, int length) {
String rowkey = Bytes.toString(buffer, offset, length);
boolean iszzy = rowkey.equals("zzy");
return iszzy;
}
public ReturnCode filterKeyValue(KeyValue v) {
if (Bytes.toString(v.getQualifier()).equals("pw")) {
if (v.getValueLength() >= len) {
this.filterRow = true;
}
return ReturnCode.SKIP;
}
return ReturnCode.INCLUDE;
}
@Override
public void filterRow(List<KeyValue> ignored) {
}
public boolean filterRow() {
if(!filterRow){
rowsAccepted ++ ;
}
return this.filterRow;
}
@Override
public boolean filterAllRemaining() {
return rowsAccepted >= limit;
}
public void reset() {
this.filterRow = false;
}
public void write(DataOutput out) throws IOException {
out.writeInt(len);
out.writeInt(limit);
}
public void readFields(DataInput in) throws IOException {
this.len = in.readInt();
this.limit = in.readInt();
}
public static String getShortDes(KeyValue kv){
return "KeyValue["+Bytes.toString(kv.getRow()) + "\t" + Bytes.toString(kv.getFamily()) + ":" + Bytes.toString(kv.getQualifier()) + "=" + Bytes.toString(kv.getValue())+"]";
}
}
new PasswordStrengthFilter(4, 2)的测试结果:请各位自己分析下每一行,每一个KeyValue是怎么被fiter处理的。
input:
addData("david", TN, "age", "12");
addData("david", TN, "pw", "p1");
addData("zzy", TN, "age", "13");
addData("zzy", TN, "pw", "p2");
addData("zzz", TN, "age", "18");
addData("zzz", TN, "pw", "p456");
addData("zzzz", TN, "age", "18");
addData("zzzz", TN, "pw", "p4");
addData("zzzzz", TN, "age", "11");
addData("zzzzz", TN, "pw", "p6");
output:
KeyValue[davidcf:age=12]
KeyValue[zzzzcf:age=18]
四.总结
- 一般返回true代表被过滤掉。
- 一个region上的所有数据共享一个Filter,并不是每来一行数据就new一个Filter来处理。所以例子中的rowsAccepted得以生效,变量filterRow的状态需要重置。
附件为测试代码和一个根据rowkey分页的RowPaginationFilter |
|
|