设为首页 收藏本站
查看: 869|回复: 0

[经验分享] Spark 读取Hbase表数据并实现类似groupByKey操作

[复制链接]

尚未签到

发表于 2019-1-30 11:50:47 | 显示全部楼层 |阅读模式
  

  一、概述
程序运行环境很重要,本次测试基于:
hadoop-2.6.5
spark-1.6.2
hbase-1.2.4
zookeeper-3.4.6
jdk-1.8
废话不多说了,直接上需求
  

  Andy column=baseINFO:age,  value=21
  Andy column=baseINFO:gender,  value=0
  Andy column=baseINFO:telphone_number, value=110110110
  Tom  column=baseINFO:age, value=18
  Tom  column=baseINFO:gender, value=1
  Tom  column=baseINFO:telphone_number, value=120120120
如上表所示,将之用spark进行分组,达到这样的效果:
[Andy,(21,0,110110110)]
[Tom,(18,1,120120120)]
需求比较简单,主要是熟悉一下程序运行过程
二、具体代码
  

package com.union.bigdata.spark.hbase;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.mapreduce.TableSplit;import org.apache.hadoop.hbase.util.Base64;import org.apache.hadoop.hbase.util.Bytes;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.SparkConf;import org.apache.spark.api.java.function.Function;import org.apache.spark.api.java.function.Function2;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.client.Scan;import org.apache.hadoop.hbase.client.Result;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.TableInputFormat;import org.apache.spark.api.java.JavaPairRDD;import org.apache.hadoop.hbase.protobuf.ProtobufUtil;import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;import org.apache.spark.api.java.function.PairFunction;import scala.Tuple10;import scala.Tuple2;import java.io.IOException;import java.util.ArrayList;import java.util.List;public class ReadHbase {
    private static String appName = "ReadTable";
    public static void main(String[] args) {
        SparkConf sparkConf = new SparkConf();
    //we can also run it at local:"local[3]"  the number 3 means 3 threads
        sparkConf.setMaster("spark://master:7077").setAppName(appName);
        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "master");
        conf.set("hbase.zookeeper.property.clientPort", "2181");
        Scan scan = new Scan();
        scan.addFamily(Bytes.toBytes("baseINFO"));
        scan.addColumn(Bytes.toBytes("baseINFO"), Bytes.toBytes("telphone_number"));
        scan.addColumn(Bytes.toBytes("baseINFO"), Bytes.toBytes("age"));
        scan.addColumn(Bytes.toBytes("baseINFO"), Bytes.toBytes("gender"));
        String scanToString = "";
        try {
            ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
            scanToString = Base64.encodeBytes(proto.toByteArray());
        } catch (IOException io) {
            System.out.println(io);
        }
        for (int i = 0; i < 2; i++) {
            try {
                String tableName = "VIPUSER";
                conf.set(TableInputFormat.INPUT_TABLE, tableName);
                conf.set(TableInputFormat.SCAN, scanToString);
                //get the Result of query from the Table of Hbase
                JavaPairRDD hBaseRDD = jsc.newAPIHadoopRDD(conf,
                        TableInputFormat.class, ImmutableBytesWritable.class,
                        Result.class);
                //group by row key like : [(Andy,110,21,0),(Tom,120,18,1)]
                JavaPairRDD art_scores = hBaseRDD.mapToPair(
                        new PairFunction() {
                            @Override
                            public Tuple2 call(Tuple2 results) {
                                List list = new ArrayList();
                                byte[] telphone_number = results._2().getValue(Bytes.toBytes("baseINFO"), Bytes.toBytes("telphone_number"));
                                byte[] age = results._2().getValue(Bytes.toBytes("baseINFO"), Bytes.toBytes("age"));
                                byte[] gender = results._2().getValue(Bytes.toBytes("baseINFO"), Bytes.toBytes("gender"));
                //the type of storage at Hbase is Byte Array,so we must let it be normal like Int,String and so on
                                list.add(Integer.parseInt(Bytes.toString(telphone_number)));
                                list.add(Integer.parseInt(Bytes.toString(age)));
                                list.add(Integer.parseInt(Bytes.toString(gender)));
                                return new Tuple2(Bytes.toString(results._1().get()), list);
                            }
                        }
                );
                //switch to Cartesian product
                JavaPairRDD cart = art_scores.cartesian(art_scores);
                //use Row Key to delete the repetition from the last step "Cartesian product"  
                JavaPairRDD cart2 = cart.filter(
                        new Function() {
                            public Boolean call(Tuple2 tuple2Tuple2Tuple2) throws Exception {
                                return tuple2Tuple2Tuple2._1()._1().compareTo(tuple2Tuple2Tuple2._2()._1()) < 0;
                            }
                        }
                );
                System.out.println("Create the List 'collect'...");
        //get the result we need
                 List collect = cart2.collect();
                 System.out.println("Done..");
                 System.out.println(collect.size() > i ? collect.get(i):"STOP");
                 if (collect.size() > i ) break;
            } catch (Exception e) {
                System.out.println(e);
            }
        }
    }
}  

  三、程序运行过程分析
1、spark自检以及Driver和excutor的启动过程
实例化一个SparkContext(若在spark2.x下,这里初始化的是一个SparkSession对象),这时候启动SecurityManager线程去检查用户权限,OK之后创建sparkDriver线程,spark底层远程通信模块(akka框架实现)启动并监听sparkDriver,之后由sparkEnv对象来注册BlockManagerMaster线程,由它的实现类对象去监测运行资源
2、zookeeper与Hbase的自检和启动
第一步顺利完成之后由sparkContext对象去实例去启动程序访问Hbase的入口,触发之后zookeeper完成自己的一系列自检活动,包括用户权限、操作系统、数据目录等,一切OK之后初始化客户端连接对象,之后由Hbase的ClientCnxn对象来建立与master的完整连接
3、spark job 的运行
程序开始调用spark的action类方法,比如这里调用了collect,会触发job的执行,这个流程网上资料很详细,无非就是DAGScheduler搞的一大堆事情,连带着出现一大堆线程,比如TaskSetManager、TaskScheduler等等,最后完成job,返回结果集
4、结束程序
正确返回结果集之后,sparkContext利用反射调用stop()方法,这之后也会触发一系列的stop操作,主要线程有这些:BlockManager,ShutdownHookManager,后面还有释放actor的操作等等,最后一切结束,临时数据和目录会被删除,资源会被释放




运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-669589-1-1.html 上篇帖子: Spark 安装与实战 下篇帖子: Spark On Yarn(HDFS HA)详细配置
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表