qqruser 发表于 2018-10-22 09:28:29

spark Sql

package org.apache.spark.sql  

  
import org.apache.spark.{SparkConf, SparkContext}
  

  
object SLA_parquetSQL {
  

  
def main(args: Array) {
  
    val sc = new SparkContext(new SparkConf().setAppName("SLA Filter"))
  
    val sqlContext = new SQLContext(sc)
  
    val suffix = args(0)
  
    sqlContext.parquetFile("/user/hive/warehouse/sla_parquet.db/e60001_shipment_exported_" + suffix).registerTempTable("e60001_shipment_exported")
  
    sqlContext.parquetFile("/user/hive/warehouse/sla_parquet.db/e62005_shipment_shipped_and_closed_" + suffix).registerTempTable("e62005_shipment_shipped_and_closed")
  
    sqlContext.parquetFile("/user/hive/warehouse/sla_parquet.db/e62006_shipment_canceled_and_closed_" + suffix).registerTempTable("e62006_shipment_canceled_and_closed")
  

  
    val e60001_shipment_exported = sqlContext.sql("select ordernumber, type_id, event_time from e60001_shipment_exported").map(line => (line(0), (line(1).toString, line(2).toString.substring(0, 19))))
  
    val e62005_shipment_shipped_and_closed = sqlContext.sql("select ordernumber, type_id, event_time from e62005_shipment_shipped_and_closed").map(line => (line(0), (line(1).toString, line(2).toString.substring(0, 19))))
  
    val e62006_shipment_canceled_and_closed = sqlContext.sql("select ordernumber, type_id, event_time from e62006_shipment_canceled_and_closed").map(line => (line(0), (line(1).toString, line(2).toString.substring(0, 19))))
  

  
    val un = e60001_shipment_exported.union(e62005_shipment_shipped_and_closed).union(e62006_shipment_canceled_and_closed)
  

  
    un.groupByKey.filter(kv => FilterSLA.filterSLA(kv._2.toSeq)).map(kv => kv._1 + "\t" + Utils.flatValues(kv._2.toSeq)).saveAsTextFile(args(1))
  
}
  
}


页: [1]
查看完整版本: spark Sql