设为首页 收藏本站
查看: 2235|回复: 0

[经验分享] SparkStreaming 中管理 Kafka Offsets 的几种方式

[复制链接]

尚未签到

发表于 2019-1-31 11:17:45 | 显示全部楼层 |阅读模式
  Spark Streaming集成了Kafka允许用户从Kafka中读取一个或者多个topic的数据。一个Kafka topic包含多个存储消息的分区(partition)。每个分区中的消息是顺序存储,并且用offset(可以认为是位置)来标记消息。
  Offset管理概述
  Spark Streaming集成了Kafka允许用户从Kafka中读取一个或者多个topic的数据。一个Kafka topic包含多个存储消息的分区(partition)。每个分区中的消息是顺序存储,并且用offset(可以认为是位置)来标记消息。开发者可以在他的Spark Streaming应用中通过offset来控制数据的读取位置,但是这需要好的offset的管理机制。
  Offsets管理对于保证流式应用在整个生命周期中数据的连贯性是非常有益的。举个例子,如果在应用停止或者报错退出之前没有将offset保存在持久化数据库中,那么offset rangges就会丢失。更进一步说,如果没有保存每个分区已经读取的offset,那么Spark Streaming就没有办法从上次断开(停止或者报错导致)的位置继续读取消息。
DSC0000.png   上面的图描述通常的Spark Streaming应用管理offset流程。Offsets可以通过多种方式来管理,但是一般来说遵循下面的步骤:
  · 在 Direct DStream初始化的时候,需要指定一个包含每个topic的每个分区的offset用于让Direct DStream从指定位置读取数据。
  · offsets就是步骤4中所保存的offsets位置
  · 读取并处理消息
  · 处理完之后存储结果数据
  · 用虚线圈存储和提交offset只是简单强调用户可能会执行一系列操作来满足他们更加严格的语义要求。这包括幂等操作和通过原子操作的方式存储 offset。
  · 最后,将offsets保存在外部持久化数据库如 HBase, Kafka, HDFS, and ZooKeeper中
  不同的方案可以根据不同的商业需求进行组合。Spark具有很好的编程范式允许用户很好的控制offsets的保存时机。认真考虑以下的情形:一个Spark Streaming 应用从Kafka中读取数据,处理或者转换数据,然后将数据发送到另一个topic或者其他系统中(例如其他消息系统、Hbase、Solr、DBMS等等)。在这个例子中,我们只考虑消息处理之后发送到其他系统中
  将Offsests存储在外部系统
  在这一章节中,我们将来探讨一下不同的外部持久化存储选项
  为了更好地理解这一章节中提到的内容,我们先来做一些铺垫。如果是使用 spark-streaming-kafka-0-10,那么我们建议将 enable.auto.commit 设为false。这个配置只是在这个版本生效,enable.auto.commit 如果设为true的话,那么意味着 offsets 会按照 auto.commit.interval.ms 中所配置的间隔来周期性自动提交到Kafka中。在Spark Streaming中,将这个选项设置为true的话会使得Spark应用从kafka中读取数据之后就自动提交,而不是数据处理之后提交,这不是我们想要的。所以为了更好地控制offsets的提交,我们建议将enable.auto.commit 设为false。
  Spark Streaming checkpoints
  使用Spark Streaming的checkpoint是最简单的存储方式,并且在Spark 框架中很容易实现。Spark Streaming checkpoints就是为保存应用状态而设计的,我们将路径这在HDFS上,所以能够从失败中恢复数据。
  对Kafka Stream 执行checkpoint操作使得offset保存在checkpoint中,如果是应用挂掉的话,那么SparkStreamig应用功能可以从保存的offset中开始读取消息。但是,如果是对Spark Streaming应用进行升级的话,那么很抱歉,不能checkpoint的数据没法使用,所以这种机制并不可靠,特别是在严格的生产环境中,我们不推荐这种方式。
  将offsets存储在HBase中
  HBase可以作为一个可靠的外部数据库来持久化offsets。通过将offsets存储在外部系统中,Spark Streaming应用功能能够重读或者回放任何仍然存储在Kafka中的数据。

  根据HBase的设计模式,允许应用能够以rowkey和column的结构将多个Spark Streaming应用和多个Kafka topic存放在一张表格中。在这个例子中,表格以topic名称、消费者group> DSC0001.jpg   对每一个批次的消息,使用saveOffsets()将从指定topic中读取的offsets保存到HBase中
DSC0002.jpg   在执行streaming任务之前,首先会使用getLastCommittedOffsets()来从HBase中读取上一次任务结束时所保存的offsets。该方法将采用常用方案来返回kafka topic分区offsets。
  · 情形1:Streaming任务第一次启动,从zookeeper中获取给定topic的分区数,然后将每个分区的offset都设置为0,并返回。
  · 情形2:一个运行了很长时间的streaming任务停止并且给定的topic增加了新的分区,处理方式是从zookeeper中获取给定topic的分区数,对于所有老的分区,offset依然使用HBase中所保存,对于新的分区则将offset设置为0。
  · 情形3:Streaming任务长时间运行后停止并且topic分区没有任何变化,在这个情形下,直接使用HBase中所保存的offset即可。
  在Spark Streaming应用启动之后如果topic增加了新的分区,那么应用只能读取到老的分区中的数据,新的是读取不到的。所以如果想读取新的分区中的数据,那么就得重新启动Spark Streaming应用。
DSC0003.jpg   当我们获取到offsets之后我们就可以创建一个Kafka Direct DStream
DSC0004.jpg   在完成本批次的数据处理之后调用saveOffsets()保存offsets.
DSC0005.jpg   你可以到HBase中去查看不同topic和消费者组的offset数据
DSC0006.jpg   代码示例用的以下的版本
DSC0007.jpg   将offsets存储到 ZooKeeper中
  在Spark Streaming连接Kafka应用中使用Zookeeper来存储offsets也是一种比较可靠的方式。
  在这个方案中,Spark Streaming任务在启动时会去Zookeeper中读取每个分区的offsets。如果有新的分区出现,那么他的offset将会设置在最开始的位置。在每批数据处理完之后,用户需要可以选择存储已处理数据的一个offset或者最后一个offset。此外,新消费者将使用跟旧的Kafka 消费者API一样的格式将offset保存在ZooKeeper中。因此,任何追踪或监控Zookeeper中Kafka Offset的工具仍然生效的。
  初始化Zookeeper connection来从Zookeeper中获取offsets
DSC0008.jpg   使用获取到的offsets来初始化Kafka Direct DStream
DSC0009.jpg   下面是从ZooKeeper获取一组offsets的方法
  注意: Kafka offset在ZooKeeper中的存储路径为/consumers/[groupId]/offsets/topic/[partitionId], 存储的值为offset
DSC00010.jpg   Kafka 本身
  Apache Spark 2.1.x以及spark-streaming-kafka-0-10使用新的的消费者API即异步提交API。你可以在你确保你处理后的数据已经妥善保存之后使用commitAsync API(异步提交 API)来向Kafka提交offsets。新的消费者API会以消费者组id作为唯一标识来提交offsets
  将offsets提交到Kafka中
  1stream.foreachRDD { rdd =>2 val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges3 // some time later, after outputs have completed4 stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)5}
  注意: commitAsync()是Spark Streaming集成kafka-0-10版本中的,在Spark文档提醒到它仍然是个实验性质的API并且存在修改的可能性。
  其他方式
  值得注意的是你也可以将offsets存储到HDFS中。但是将offsets存储到HDFS中并不是一个受欢迎的方式,因为HDFS对已ZooKeeper和Hbase来说它的延迟有点高。此外,将每批次数据的offset存储到HDFS中还会带来小文件的问题
  不管理offsets
  管理offsets对于Spark Streaming应该用来说并不是必须的。举个例子,像应用存活监控它只需要当前的数据,并不需要通过管理offsets来保证数据的不丢失。这种情形下你完全不需要管理offsets,老的kafka消费者可以将auto.offset.reset设为largest或者smallest,而新的消费者则设置为earliest or latest。
  如果你将auto.offset.reset设为smallest (earliest),那么任务会从最开始的offset读取数据,相当于重播所有数据。这样的设置会使得你的任务重启时将该topic中仍然存在的数据再读取一遍。这将由你的消息保存周期来决定你是否会重复消费。
  相反地,如果你将auto.offset.reset 设置为largest (latest),那么你的应用启动时会从最新的offset开始读取,这将导致你丢失数据。这将依赖于你的应用对数据的严格性和语义需求,这或许是个可行的方案。
  总结
  上面我们所讨论的管理offsets的方式将帮助你在Spark Streaming应用中如何有效地控制offsets。这些方法能够帮助用户在持续不断地计算和存储数据应用中更好地面对应用失效和数据恢复的场景。
  转自:http://www.raincent.com/content-85-11243-1.html


运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-669980-1-1.html 上篇帖子: Python: kafka-python版本差异导致的问题 下篇帖子: Kafka、RabbitMQ、RocketMQ消息中间件的对比视频教程
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表