bobpxp 发表于 2019-1-31 07:30:24

第16课:Spark Streaming源码解读之数据清理内幕彻底解密

private class JobHandler(job: Job) extends Runnable with Logging {  
    import JobScheduler._    def run() {      try {      val formattedTime = UIUtils.formatBatchTime(
  
          job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)      val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}"
  
      val batchLinkText = s""
  

  
      ssc.sc.setJobDescription(
  
          s"""Streaming job from $batchLinkText""")
  
      ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString)
  
      ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString)      // We need to assign `eventLoop` to a temp variable. Otherwise, because
  
      // `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then
  
      // it's possible that when `post` is called, `eventLoop` happens to null.
  
      var _eventLoop = eventLoop      if (_eventLoop != null) {
  
          _eventLoop.post(JobStarted(job, clock.getTimeMillis()))          // Disable checks for existing output directories in jobs launched by the streaming
  
          // scheduler, since we may need to write output to an existing directory during checkpoint
  
          // recovery; see SPARK-4835 for more details.
  
          PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
  
            job.run()
  
          }
  
          _eventLoop = eventLoop          if (_eventLoop != null) {//当Job完成的时候,eventLoop会发消息初始化onReceive
  
            _eventLoop.post(JobCompleted(job, clock.getTimeMillis()))
  
          }
  
      } else {          // JobScheduler has been stopped.
  
      }
  
      } finally {
  
      ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, null)
  
      ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, null)
  
      }
  
    }
  
}
  
}


页: [1]
查看完整版本: 第16课:Spark Streaming源码解读之数据清理内幕彻底解密