设为首页 收藏本站
查看: 1599|回复: 0

[经验分享] spark读取kafka数据流

[复制链接]

尚未签到

发表于 2019-1-30 10:48:42 | 显示全部楼层 |阅读模式
spark读取kafka数据流提供了两种方式createDstream和createDirectStream。
两者区别如下:
1、KafkaUtils.createDstream
构造函数为KafkaUtils.createDstream(ssc, [zk], [consumer group id], [per-topic,partitions] )
使用了receivers来接收数据,利用的是Kafka高层次的消费者api,对于所有的receivers接收到的数据将会保存在Spark executors中,然后通过Spark Streaming启动job来处理这些数据,默认会丢失,可启用WAL日志,该日志存储在HDFS上
A、创建一个receiver来对kafka进行定时拉取数据,ssc的rdd分区和kafka的topic分区不是一个概念,故如果增加特定主体分区数仅仅是增加一个receiver中消费topic的线程数,并不增加spark的并行处理数据数量
B、对于不同的group和topic可以使用多个receivers创建不同的DStream
C、如果启用了WAL,需要设置存储级别,即KafkaUtils.createStream(….,StorageLevel.MEMORY_AND_DISK_SER)

2.KafkaUtils.createDirectStream
区别Receiver接收数据,这种方式定期地从kafka的topic+partition中查询最新的偏移量,再根据偏移量范围在每个batch里面处理数据,使用的是kafka的简单消费者api
优点:
A、 简化并行,不需要多个kafka输入流,该方法将会创建和kafka分区一样的rdd个数,而且会从kafka并行读取。
B、高效,这种方式并不需要WAL,WAL模式需要对数据复制两次,第一次是被kafka复制,另一次是写到wal中
C、恰好一次语义(Exactly-once-semantics),传统的读取kafka数据是通过kafka高层次api把偏移量写入zookeeper中,存在数据丢失的可能性是zookeeper中和ssc的偏移量不一致。EOS通过实现kafka低层次api,偏移量仅仅被ssc保存在checkpoint中,消除了zk和ssc偏移量不一致的问题。缺点是无法使用基于zookeeper的kafka监控工具

  

  public void adclick(){
  SparkConf conf = new SparkConf()
  .setAppName("")
  .setMaster("");
  JavaStreamingContext jssc = new JavaStreamingContext(conf,Durations.seconds(10));
  jssc.checkpoint("");
  Map kafkaParams = new HashMap();
  kafkaParams.put("metadata.broker.list", ConfigurationManager.getProperty("metadata.broker.list"));
  String kafkaTopics = ConfigurationManager.getProperty("kafkaTopics");
  String[] kafkaTopicsSplits = kafkaTopics.split(",");
  Set tops = new HashSet();
  for(String xx:kafkaTopicsSplits){
  tops.add(xx);
  }
  JavaPairInputDStream adRealTimeDStream = KafkaUtils.
  createDirectStream(
  jssc,
  String.class,
  String.class,
  StringDecoder.class,
  StringDecoder.class,
  kafkaParams,
  tops);
  
  
  jssc.start();
  jssc.awaitTermination();
  jssc.close();
  
  }
  





运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-669522-1-1.html 上篇帖子: SPARK启动历史任务查看 下篇帖子: spark写orc格式文件
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表