设为首页 收藏本站
查看: 758|回复: 0

[经验分享] 第97课:Spark Streaming 结合Spark SQL 案例

[复制链接]

尚未签到

发表于 2018-10-21 13:55:47 | 显示全部楼层 |阅读模式
package com.dt.spark.streaming  

  
import org.apache.spark.sql.SQLContext
  
import org.apache.spark.{SparkContext, SparkConf}
  
import org.apache.spark.streaming.{StreamingContext, Duration}
  

  
/**
  
* 使用SparkStreaming结合SparkSQL对日志进行分析。
  
* 假设电商网站点击日志格式(简化)如下:
  
* userid,itemId,clickTime
  
* 需求:处理10分钟内item点击次数排序Top10,并且将商品名称显示出来。商品itemId与商品名称的对应关系存放在MySQL数据库中
  
* Created by dinglq on 2016/5/4.
  
*/
  
object LogAnalyzerStreamingSQL {
  
  val WINDOW_LENGTH = new Duration(600 * 1000)
  
  val SLIDE_INTERVAL = new Duration(10 * 1000)
  

  
  def main(args: Array[String]) {
  
    val sparkConf = new SparkConf().setAppName("LogAnalyzerStreamingSQL").setMaster("local[4]")
  

  
    val sc = new SparkContext(sparkConf)
  

  
    val sqlContext = new SQLContext(sc)
  
    import sqlContext.implicits._
  
    //从数据库中加载itemInfo表
  
    val itemInfoDF = sqlContext.read.format("jdbc").options(Map(
  
      "url"-> "jdbc:mysql://spark-master:3306/spark",
  
      "driver"->"com.mysql.jdbc.Driver",
  
      "dbtable"->"iteminfo",
  
      "user"->"root",
  
      "password"-> "vincent"
  
      )).load()
  

  
    itemInfoDF.registerTempTable("itemInfo")
  

  
    val streamingContext = new StreamingContext(sc, SLIDE_INTERVAL)
  

  
    val logLinesDStream = streamingContext.textFileStream("D:/logs_incoming")
  

  
    val accessLogsDStream = logLinesDStream.map(AccessLog.parseLogLine).cache()
  

  
    val windowDStream = accessLogsDStream.window(WINDOW_LENGTH, SLIDE_INTERVAL)
  

  
    windowDStream.foreachRDD(accessLogs => {
  
      if (accessLogs.isEmpty()) {
  
        println("No logs received in this time interval")
  
      } else {
  
        accessLogs.toDF().registerTempTable("accessLogs")
  
        val sqlStr = "SELECT a.itemid,a.itemname,b.cnt FROM itemInfo a JOIN " +
  
          " (SELECT itemId,COUNT(*) cnt FROM accessLogs GROUP BY itemId) b " +
  
          " ON (a.itemid=b.itemId) ORDER BY cnt DESC LIMIT 10 "
  
        val topTenClickItemLast10Minus = sqlContext.sql(sqlStr)
  

  
        // Persist top ten table for this window to HDFS as parquet file
  

  
        topTenClickItemLast10Minus.show()
  
      }
  
    })
  

  
    streamingContext.start()
  
    streamingContext.awaitTermination()
  
  }
  
}
  

  
case class AccessLog(userId: String, itemId: String, clickTime: String) {
  
}
  

  
object AccessLog {
  

  
  def parseLogLine(log: String): AccessLog = {
  
    val logInfo = log.split(",")
  
    if (logInfo.length == 3) {
  
      AccessLog(logInfo(0),logInfo(1), logInfo(2))
  
    }
  
    else {
  
      AccessLog("0","0","0")
  
    }
  
  }
  
}



运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-624557-1-1.html 上篇帖子: 数据库 sql xml类型 查询及操作 下篇帖子: Apache Spark在SnappyData支持即时SQL分析
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表