设为首页 收藏本站
查看: 1797|回复: 0

[经验分享] 第5课:基于案例一节课贯通Spark Streaming流计算框架的运行源码

[复制链接]

尚未签到

发表于 2019-1-31 08:15:57 | 显示全部楼层 |阅读模式
  package com.dt.spark.streaming_scala
  import org.apache.spark.SparkConf
  import org.apache.spark.sql.Row
  import org.apache.spark.sql.hive.HiveContext
  import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
  import org.apache.spark.streaming.{Seconds, StreamingContext}
  /**
  * 使用Spark Streaming+Spark SQL来在线动态计算电商中不同类别中最热门的商品排名,例如手机这个类别下面最热门的三种手机、电视这个类别
  * 下最热门的三种电视,该实例在实际生产环境下具有非常重大的意义;
  *
  * @author DT大数据梦工厂
  * 新浪微博:http://weibo.com/ilovepains/
  *
  *   实现技术:Spark Streaming+Spark SQL,之所以Spark Streaming能够使用ML、sql、graphx等功能是因为有foreachRDD和Transform
  * 等接口,这些接口中其实是基于RDD进行操作,所以以RDD为基石,就可以直接使用Spark其它所有的功能,就像直接调用API一样简单。
  *  假设说这里的数据的格式:user item category,例如Rocky Samsung Android
  */
  object OnlineTheTop3ItemForEachCategory2DB {
  def main(args: Array[String]){
  /**
  * 第1步:创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息,
  * 例如说通过setMaster来设置程序要链接的Spark集群的Master的URL,如果设置
  * 为local,则代表Spark程序在本地运行,特别适合于机器配置条件非常差(例如
  * 只有1G的内存)的初学者       *
  */
  val conf = new SparkConf() //创建SparkConf对象
  conf.setAppName("OnlineTheTop3ItemForEachCategory2DB") //设置应用程序的名称,在程序运行的监控界面可以看到名称
  conf.setMaster("spark://Master:7077") //此时,程序在Spark集群
  //设置batchDuration时间间隔来控制Job生成的频率并且创建Spark Streaming执行的入口
  val ssc = new StreamingContext(conf, Seconds(5))
  ssc.checkpoint("/root/Documents/SparkApps/checkpoint")
  val userClickLogsDStream = ssc.socketTextStream("Master", 9999)
  val formattedUserClickLogsDStream = userClickLogsDStream.map(clickLog =>
  (clickLog.split(" ")(2) + "_" + clickLog.split(" ")(1), 1))
  val categoryUserClickLogsDStream = formattedUserClickLogsDStream.reduceByKeyAndWindow(_+_,
  _-_, Seconds(60), Seconds(20))
  categoryUserClickLogsDStream.foreachRDD { rdd => {
  if (rdd.isEmpty()) {
  println("No data inputted!!!")
  } else {
  val categoryItemRow = rdd.map(reducedItem => {
  val category = reducedItem._1.split("_")(0)
  val item = reducedItem._1.split("_")(1)
  val click_count = reducedItem._2
  Row(category, item, click_count)
  })
  val structType = StructType(Array(
  StructField("category", StringType, true),
  StructField("item", StringType, true),
  StructField("click_count", IntegerType, true)
  ))
  val hiveContext = new HiveContext(rdd.context)
  val categoryItemDF = hiveContext.createDataFrame(categoryItemRow, structType)
  categoryItemDF.registerTempTable("categoryItemTable")
  val reseltDataFram = hiveContext.sql("SELECT category,item,click_count FROM (SELECT category,item,click_count,row_number()" +
  " OVER (PARTITION BY category ORDER BY click_count DESC) rank FROM categoryItemTable) subquery " +
  " WHERE rank   }
  if (isTrackerStopping || isTrackerStopped) {
  return false
  }
  val scheduledLocations = receiverTrackingInfos(streamId).scheduledLocations
  val acceptableExecutors = if (scheduledLocations.nonEmpty) {
  // This receiver is registering and it's scheduled by
  // ReceiverSchedulingPolicy.scheduleReceivers. So use "scheduledLocations" to check it.
  scheduledLocations.get
  } else {
  // This receiver is scheduled by "ReceiverSchedulingPolicy.rescheduleReceiver", so calling
  // "ReceiverSchedulingPolicy.rescheduleReceiver" again to check it.
  scheduleReceiver(streamId)
  }
  def isAcceptable: Boolean = acceptableExecutors.exists {
  case loc: ExecutorCacheTaskLocation => loc.executorId == executorId
  case loc: TaskLocation => loc.host == host
  }
  if (!isAcceptable) {
  // Refuse it since it's scheduled to a wrong executor
  false
  } else {
  val name = s"${typ}-${streamId}"
  val receiverTrackingInfo = ReceiverTrackingInfo(
  streamId,
  ReceiverState.ACTIVE,
  scheduledLocations = None,
  runningExecutor = Some(ExecutorCacheTaskLocation(host, executorId)),
  name = Some(name),
  endpoint = Some(receiverEndpoint))
  receiverTrackingInfos.put(streamId, receiverTrackingInfo)
  listenerBus.post(StreamingListenerReceiverStarted(receiverTrackingInfo.toReceiverInfo))
  logInfo("Registered receiver for stream " + streamId + " from " + senderAddress)
  true
  }
  }
  receiver的启动,我们以ssc.socketTextStream("localhost", 9999)为例,创建的是SocketReceiver对象。内部启动一个线程来连接Socket Server,和读取socket的数据并存储。
  private[streaming]

  class SocketReceiver[T:>  host: String,
  port: Int,
  bytesToObjects: InputStream => Iterator[T],
  storageLevel: StorageLevel
  ) extends Receiver[T](storageLevel) with Logging {
  def onStart() {
  // Start the thread that receives data over a connection
  new Thread("Socket Receiver") {
  setDaemon(true)
  override def run() { receive() }
  }.start()
  }
  def onStop() {
  // There is nothing much to do as the thread calling receive()
  // is designed to stop by itself isStopped() returns false
  }
  /** Create a socket connection and receive data until receiver is stopped */
  def receive() {
  var socket: Socket = null
  try {
  logInfo("Connecting to " + host + ":" + port)
  socket = new Socket(host, port)
  logInfo("Connected to " + host + ":" + port)
  val iterator = bytesToObjects(socket.getInputStream())
  while(!isStopped && iterator.hasNext) {
  store(iterator.next)
  }
  if (!isStopped()) {
  restart("Socket data stream had no more data")
  } else {
  logInfo("Stopped receiving")
  }
  } catch {
  case e: java.net.ConnectException =>
  restart("Error connecting to " + host + ":" + port, e)
  case NonFatal(e) =>
  logWarning("Error receiving data", e)
  restart("Error receiving data", e)
  } finally {
  if (socket != null) {
  socket.close()
  logInfo("Closed socket to " + host + ":" + port)
  }
  }
  }
  }
  def submitJobSet(jobSet: JobSet) {
  if (jobSet.jobs.isEmpty) {
  logInfo("No jobs added for time " + jobSet.time)
  } else {
  listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
  jobSets.put(jobSet.time, jobSet)
  jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
  logInfo("Added jobs for time " + jobSet.time)
  }
  }

  private>  import JobScheduler._
  def run() {
  try {
  val formattedTime = UIUtils.formatBatchTime(
  job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS =false)
  val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}"
  val batchLinkText = s"[output operation ${job.outputOpId}, batch time${formattedTime}]"
  ssc.sc.setJobDescription(
  s"""Streaming job from $batchLinkText""")
  ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString)
  ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString)
  // We need to assign `eventLoop` to a temp variable. Otherwise, because
  // `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then
  // it's possible that when `post` is called, `eventLoop` happens to null.
  var _eventLoop = eventLoop
  if (_eventLoop != null) {
  _eventLoop.post(JobStarted(job, clock.getTimeMillis()))
  // Disable checks for existing output directories in jobs launched by the streaming
  // scheduler, since we may need to write output to an existing directory during checkpoint
  // recovery; see SPARK-4835 for more details.
  PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
  job.run()
  }
  _eventLoop = eventLoop
  if (_eventLoop != null) {
  _eventLoop.post(JobCompleted(job, clock.getTimeMillis()))
  }
  } else {
  // JobScheduler has been stopped.
  }
  } finally {
  ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, null)
  ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, null)
  }
  }
  }
  }
  private[streaming]
  class Job(val time: Time, func: () => _) {
  private var _id: String = _
  private var _outputOpId: Int = _
  private var isSet = false
  private var _result: Try[_] = null
  private var _callSite: CallSite = null
  private var _startTime: Option[Long] = None
  private var _endTime: Option[Long] = None
  def run() {
  _result = Try(func())
  }


运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-669827-1-1.html 上篇帖子: DStreamWordCount 下篇帖子: VMware虚拟机内安装的Ubuntu 16.04设置静态IP地址 spark2.0集群固定IP配置
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表