bestu 发表于 2018-10-21 13:55:47

第97课:Spark Streaming 结合Spark SQL 案例

package com.dt.spark.streaming  

  
import org.apache.spark.sql.SQLContext
  
import org.apache.spark.{SparkContext, SparkConf}
  
import org.apache.spark.streaming.{StreamingContext, Duration}
  

  
/**
  
* 使用SparkStreaming结合SparkSQL对日志进行分析。
  
* 假设电商网站点击日志格式(简化)如下:
  
* userid,itemId,clickTime
  
* 需求:处理10分钟内item点击次数排序Top10,并且将商品名称显示出来。商品itemId与商品名称的对应关系存放在MySQL数据库中
  
* Created by dinglq on 2016/5/4.
  
*/
  
object LogAnalyzerStreamingSQL {
  
val WINDOW_LENGTH = new Duration(600 * 1000)
  
val SLIDE_INTERVAL = new Duration(10 * 1000)
  

  
def main(args: Array) {
  
    val sparkConf = new SparkConf().setAppName("LogAnalyzerStreamingSQL").setMaster("local")
  

  
    val sc = new SparkContext(sparkConf)
  

  
    val sqlContext = new SQLContext(sc)
  
    import sqlContext.implicits._
  
    //从数据库中加载itemInfo表
  
    val itemInfoDF = sqlContext.read.format("jdbc").options(Map(
  
      "url"-> "jdbc:mysql://spark-master:3306/spark",
  
      "driver"->"com.mysql.jdbc.Driver",
  
      "dbtable"->"iteminfo",
  
      "user"->"root",
  
      "password"-> "vincent"
  
      )).load()
  

  
    itemInfoDF.registerTempTable("itemInfo")
  

  
    val streamingContext = new StreamingContext(sc, SLIDE_INTERVAL)
  

  
    val logLinesDStream = streamingContext.textFileStream("D:/logs_incoming")
  

  
    val accessLogsDStream = logLinesDStream.map(AccessLog.parseLogLine).cache()
  

  
    val windowDStream = accessLogsDStream.window(WINDOW_LENGTH, SLIDE_INTERVAL)
  

  
    windowDStream.foreachRDD(accessLogs => {
  
      if (accessLogs.isEmpty()) {
  
      println("No logs received in this time interval")
  
      } else {
  
      accessLogs.toDF().registerTempTable("accessLogs")
  
      val sqlStr = "SELECT a.itemid,a.itemname,b.cnt FROM itemInfo a JOIN " +
  
          " (SELECT itemId,COUNT(*) cnt FROM accessLogs GROUP BY itemId) b " +
  
          " ON (a.itemid=b.itemId) ORDER BY cnt DESC LIMIT 10 "
  
      val topTenClickItemLast10Minus = sqlContext.sql(sqlStr)
  

  
      // Persist top ten table for this window to HDFS as parquet file
  

  
      topTenClickItemLast10Minus.show()
  
      }
  
    })
  

  
    streamingContext.start()
  
    streamingContext.awaitTermination()
  
}
  
}
  

  
case class AccessLog(userId: String, itemId: String, clickTime: String) {
  
}
  

  
object AccessLog {
  

  
def parseLogLine(log: String): AccessLog = {
  
    val logInfo = log.split(",")
  
    if (logInfo.length == 3) {
  
      AccessLog(logInfo(0),logInfo(1), logInfo(2))
  
    }
  
    else {
  
      AccessLog("0","0","0")
  
    }
  
}
  
}


页: [1]
查看完整版本: 第97课:Spark Streaming 结合Spark SQL 案例