darkpoon 发表于 2017-3-2 08:16:57

spark scala mysql 语法

  https://spark.apache.org/docs/1.5.1/api/java/org/apache/spark/sql/DataFrame.html
  https://github.com/yu-iskw/spark-dataframe-introduction/blob/master/doc/dataframe-introduction.md
  need register a temporary table:

// Register a temporary table for the schema
event.registerTempTable("event")
// Execute a Spark SQL
context.sql("SELECT created_at, repo.name AS `repo.name`, actor.id, type FROM event WHERE type = 'PullRequestEvent'").limit(5).show()

scala:

  import java.sql.{PreparedStatement, Connection, DriverManager}
  import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkContext, SparkConf}
import java.util.Properties
  object hoursAvg {
  def main(args: Array): Unit = {
  val sparkConf = new SparkConf().setAppName("RDDRelation").setMaster("spark://Master.Hadoop:7077")
      val sc = new SparkContext(sparkConf)
      val sqlContext = new SQLContext(sc)
  Class.forName("com.mysql.jdbc.Driver")
  val url = "jdbc:mysql://IP:port/db"
  val prop = new Properties()
      prop.setProperty("user","greenwave")
      prop.setProperty("password","green4irvine")
  val accountstatsDF = sqlContext.read.jdbc(url,"AccountStats",prop)
      val accountDF = sqlContext.read.jdbc(url,"Account",prop)
  accountstatsDF.registerTempTable("accountstatsDF")
      accountDF.registerTempTable("accountDF")
  val selData = sqlContext.sql("select s.*, a.UtilityAccountID from accountstatsDF s, accountDF a where a.AccountID=s.AccountID")
  selData.registerTempTable("selData")
  val selData_date_time = sqlContext.sql("select AccountID, SubString(UpdateDate, 1, 10) as Date,SubString(UpdateDate, 12, 19) as Time, Period, StatsType, StatsVal, UtilityAccountID from selData")
      selData_date_time.registerTempTable("selData_date_time")
  val filter_heatinghours = selData_date_time.filter(selData("StatsType")==="EON_SH.heatinghours").groupBy("UtilityAccountID","Date","Period","StatsType").avg("StatsVal")
      filter_heatinghours.registerTempTable("filter_heatinghours")
  val result_heatinghours = filter_heatinghours.select(filter_heatinghours("UtilityAccountID"), filter_heatinghours("Period"), filter_heatinghours("Date"), filter_heatinghours("StatsType"), filter_heatinghours("AVG(StatsVal)").as("StatsVal"))
      result_heatinghours.registerTempTable("temp_heatinghours")
  val filter_hotwaterhours = selData_date_time.filter(selData("StatsType")==="EON_SH.hotwaterhours").groupBy("UtilityAccountID","Date","Period","StatsType").avg("StatsVal")
      filter_hotwaterhours.registerTempTable("filter_hotwaterhours")
  val result_hotwaterhours = filter_hotwaterhours.select(filter_hotwaterhours("UtilityAccountID"), filter_hotwaterhours("Period"), filter_hotwaterhours("Date"), filter_hotwaterhours("StatsType"), filter_hotwaterhours("AVG(StatsVal)").as("StatsVal"))
      result_hotwaterhours.registerTempTable("temp_hotwaterhours")
  val result = result_heatinghours.unionAll(result_hotwaterhours)
  val dfWriter = result.write.mode("overwrite")
      dfWriter.jdbc(url,"AccountStatsAggregate",prop)
  
}
}
  sbt:
  name := "hoursAvg"
  version := "1.0"
  val apacheSpark = "org.apache.spark" %% "spark-core" % "1.2.0"
  val apacheSQL = "mysql" % "mysql-connector-java" % "5.1.37"
  val apacheSSQL = "org.apache.spark" % "spark-sql_2.10" % "1.4.0"
  lazy val commonSettings = Seq(
organization := "com.gws",
version := "0.1.0",
scalaVersion := "2.10.4"
)
  lazy val root = (project in file(".")).
settings(commonSettings: _*).
settings(
    name := "hoursAvg",
    libraryDependencies ++= Seq (
      apacheSQL,
      apacheSSQL,
      apacheSpark.
      exclude("com.esotericsoftware.kryo", "kryo") .
      exclude("javax.activation", "activation") .
      exclude("commons-logging", "commons-logging") .
      exclude("commons-collections", "commons-collections") .
      exclude("org.eclipse.jetty.orbit", "javax.transaction") .
      exclude("org.eclipse.jetty.orbit", "javax.servlet") .
      exclude("org.eclipse.jetty.orbit", "javax.mail.glassfish") .
      exclude("org.eclipse.jetty.orbit", "javax.activation")
    )
)
页: [1]
查看完整版本: spark scala mysql 语法