设为首页 收藏本站
查看: 714|回复: 0

[经验分享] Hadoop学习笔记(四):HBase

[复制链接]

尚未签到

发表于 2015-7-12 12:59:15 | 显示全部楼层 |阅读模式
  HBase是在一个HDFS上开发的面向列的分布式数据库。HBase不是关系型数据库,不支持SQL。
  HTable一些基本概念


  • Row key
  行主键, HBase不支持条件查询和Order by等查询,读取记录只能按Row key(及其range)或全表扫描,因此Row key需要根据业务来设计以利用其存储排序特性(Table按Row key字典序排序如1,10,100,11,2)提高性能。


  • Column Family(列族)
  在表创建时声明,每个Column Family为一个存储单元。在上例中设计了一个HBase表blog,该表有两个列族:article和author。


  • Column(列)
  HBase的每个列都属于一个列族,以列族名为前缀,如列article:title和article:content属于article列族,author:name和author:nickname属于author列族。
Column不用创建表时定义即可以动态新增,同一Column Family的Columns会群聚在一个存储单元上,并依Column key排序,因此设计时应将具有相同I/O特性的Column设计在一个Column Family上以提高性能。


  • Timestamp
  HBase通过row和column确定一份数据,这份数据的值可能有多个版本,不同版本的值按照时间倒序排序,即最新的数据排在最前面,查询时默认返回最新版本。如上例中row key=1的author:nickname值有两个版本,分别为1317180070811对应的“一叶渡江”和1317180718830对应的“yedu”(对应到实际业务可以理解为在某时刻修改了nickname为yedu,但旧值仍然存在)。Timestamp默认为系统当前时间(精确到毫秒),也可以在写入数据时指定该值。


  • Value
  每个值通过4个键唯一索引,tableName+RowKey+ColumnKey+Timestamp=>value,例如上例中{tableName=’blog’,RowKey=’1’,ColumnName=’author:nickname’,Timestamp=’ 1317180718830’}索引到的唯一值是“yedu”。


  • 存储类型
  TableName 是字符串
RowKey 和 ColumnName 是二进制值(Java 类型 byte[])
Timestamp 是一个 64 位整数(Java 类型 long)
value 是一个字节数组(Java类型 byte[])。
  SHELL操作
  HBase提供了丰富的访问接口,其中HBase Shell是常用的便捷方式。
     · HBase Shell
     · Java clietn API
     · Jython、Groovy DSL、Scala
     · REST
     · Thrift(Ruby、Python、Perl、C++…)
     · MapReduce
     · Hive/Pig
     创建表
> create 'test','data1','data2'
   Column Family是schema的一部分,而Column不是。这里的data1和data2是Column Family。
     增加记录
>put 'test','1','data1:name','luc'
>put 'test','1','data1:age','24'
>put 'test','1','data2:height','170cm'
>put 'test','1','data2:weight','65kg'
>put 'test','1','data1:nickname','vichao'
Column完全动态扩展,每行可以有不同的Columns。(ps:好像有点胖啊,要减肥~~)
     根据RowKey查询
> get 'test','1'
  HTable按RowKey字典序(1,10,100,11,2)自动排序,每行包含任意数量的Columns,Columns按ColumnKey(data1:age,data1:name,data1:nickname,data2:height,data2:weight)自动排序。
     更新操作


  • 查询值:
  > get 'test','1','data1:nickname'


  • 更新nickname为'vic':
  > put 'test','1','data1:nickname','vic'


  • 查询更新后的结果:(返回的将是vic)
  > get ‘blog’,’1’,’data1:nickname’

  知识点回顾:查询默认返回最近的值。


  • 查询nickname的多个(本示例为2个)版本值
  > get 'test','1',{COLUMN => 'data1:nickname',VERSIONS => 2}
知识点回顾:每个Column可以有任意数量的Values,按Timestamp倒序自动排序。


  • 如何只查询到以前的旧版本呢,需要借助Timestamp
  >get 'test','1',{COLUMN=>'data1:nickname',TIMESTAMP=>1373707746997}
  知识点回顾:TabelName+RowKey+Column+Timestamp=>Value
    删除记录


  • delete只能删除一个column
  >delete 'test','1','data1:nickname'


  • 删除RowKey的所有column用deleteall
  >deleteall 'test','1'
   删除表
练习完毕,把练习表删了吧,删除之前需要先disable
>disable 'test'
>drop 'test'
  
  JAVA API操作
  javaAPI操作还是比较简单的,各种api类和api函数,直接上代码了



import java.io.IOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.conf.Configuration;
public class ExampleClient {
public static void main(String[] args) throws IOException {
Configuration conf = HBaseConfiguration.create();
// Create table
HBaseAdmin admin = new HBaseAdmin(conf);
HTableDescriptor htd = new HTableDescriptor("test");
HColumnDescriptor hcd = new HColumnDescriptor("data");
htd.addFamily(hcd);
admin.createTable(htd);
byte[] tablename = htd.getName();
HTableDescriptor[] tables = admin.listTables();
if (tables.length != 1 && Bytes.equals(tablename, tables[0].getName())) {
throw new IOException("Failed create of table!");
}
// Run some operations
HTable table = new HTable(conf, tablename);
byte[] row1 = Bytes.toBytes("row1");
Put p1 = new Put(row1);
byte[] databytes = Bytes.toBytes("data");
p1.add(databytes, Bytes.toBytes("1"), Bytes.toBytes("value1"));
table.put(p1);
Get g = new Get(row1);
Result result = table.get(g);
System.out.println("GET:" + result);
Scan scan = new Scan();
ResultScanner scanner = table.getScanner(scan);
try {
for (Result scannerResult : scanner) {
System.out.println("Scan:" + scannerResult);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
scanner.close();
}
// Drop the table
admin.disableTable(tablename);
admin.deleteTable(tablename);

}
}
  
  这里要注意的一点是连接到HBase需要将将HBase环境的hbase-site.xml文件引入到工程中,就像jdbc的数据库连接一样,不然是连不上hbase滴。

   Configuration conf = HBaseConfiguration.create();//这行代码会从hbase-site.xml中读取配置信息
  HBase MapReduce操作
  直接上代码了:



import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Job;
public class testHBaseMR {
public static class Mapper extends
TableMapper {
public Mapper() {
}
@Override
public void map(ImmutableBytesWritable row, Result values,
Context context) throws IOException {
ImmutableBytesWritable value = null;
String[] tags = null;
for (KeyValue kv : values.list()) {
if ("author".equals(Bytes.toString(kv.getFamily()))
&& "nickname".equals(Bytes.toString(kv.getQualifier()))) {
value = new ImmutableBytesWritable(kv.getValue());
}
if ("article".equals(Bytes.toString(kv.getFamily()))
&& "tags".equals(Bytes.toString(kv.getQualifier()))) {
tags = Bytes.toString(kv.getValue()).split(",");
}
}
for (int i = 0; i < tags.length; i++) {
ImmutableBytesWritable key = new ImmutableBytesWritable(
Bytes.toBytes(tags.toLowerCase()));
try {
context.write(key, value);
} catch (InterruptedException e) {
throw new IOException(e);
}
}
}
}
public static class Reducer
extends
TableReducer {
@Override
public void reduce(ImmutableBytesWritable key, Iterable values,
Context context) throws IOException, InterruptedException {
String friends = "";
for (ImmutableBytesWritable val : values) {
friends += (friends.length() > 0 ? "," : "")
+ Bytes.toString(val.get());
}
Put put = new Put(key.get());
put.add(Bytes.toBytes("person"), Bytes.toBytes("nicknames"),
Bytes.toBytes(friends));
context.write(key, put);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf = HBaseConfiguration.create(conf);
Job job = new Job(conf, "HBase_FindFriend");
job.setJarByClass(testHBaseMR.class);
Scan scan = new Scan();
scan.addColumn(Bytes.toBytes("author"), Bytes.toBytes("nickname"));
scan.addColumn(Bytes.toBytes("article"), Bytes.toBytes("tags"));
TableMapReduceUtil
.initTableMapperJob("blog", scan, Mapper.class,
ImmutableBytesWritable.class,
ImmutableBytesWritable.class, job);
TableMapReduceUtil
.initTableReducerJob("tag_friend", Reducer.class, job);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
TableMapReduceUtil.initTableMapperJob;TableMapReduceUtil.initTableReducerJob 用来指定map和reduce。相当于hbase的MR操作工具类吧~
  
   优化
  针对行的键按数据排列的次序进行随机处理;
  每个任务只实例化一个HTable对象;
  HTable.put(put)执行put操作时不使用任何缓冲。可以通过使用HTable.setAutoFlush(false),设置禁用自动刷入,并设置写缓冲大小,同时在任务的最后设置HTable.flushCommits()或者HTable.close(),以确保缓冲中最后没有剩下的未刷入的数据;
  设计行键的时候要多加考虑,可以使用复合键,如果键是整数,则应该使用二进制形式以节省存储空间;
  HBase和RDBMS的比较
  HBase:
  HBase是一个分布式的面向列的数据存储系统;
  HBase表可以很高和很宽(数十亿行,数百万列);
  水平分区并在数千个商用机节点上自动复制;
  表的模式是物理存储的直接反映;
  不支持SQL;
  不需要强一致性和参照完整性;
  不支持索引;行是顺序存储的,每行中的列也是,不存在索引膨胀的问题,插入性能和表的大小无关
  事务好像仅仅支持针对某一行的一系列Put/Delete操作。不同行、不同表间的操作是无法放在一个事务中的;
  HBase不支持条件查询和Order by等查询,读取记录只能按Row key(及其range)或全表扫描;
  没有内置对连接操作的支持,但是由于表的宽度可以很大,一个宽行可以容下一个主键相关的所有数据,并不需要使用连接;
  
  RDBMS:
  支持SQL;
  需要强一致性和参照完整性;
  支持事务,索引等;
  HBase的特性
  没有索引,行是顺序存储的,每行中的列也是,不存在索引膨胀的问题,插入性能和表的大小无关;
  自动分区,在表增长的时候,表会自动分裂成区域,并分布到可用的节点上;
  线性扩展和对于新节点的自动处理,增加一个节点,把它指向现有的集群,并运行Regionserver,区域会自动重新进行平衡,负载会均匀分布;
  普通商用硬件支持;
  容错,大量节点意味着每个节点的重要性并不突出,不用担心单个节点失效;
  批处理,支持MapReduce操作,可以用全并行的分布式作业来处理数据;
  
  
  
  

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-85853-1-1.html 上篇帖子: [测试] 试用Hadoop 2.2中的HDFS NFS 下篇帖子: 【Hadoop】HDFS
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表