小风儿 发表于 2017-12-15 11:12:22

Spark连接MongoDB之Scala

import org.apache.spark.sql.SparkSession  
import com.mongodb.spark._
  
import com.mongodb.spark.config._
  
import org.bson.Document
  

  val spark = SparkSession.builder()
  .master("local")
  .appName("MongoSparkConnector")
  .config("spark.some.config.option", "some-value")
  .getOrCreate()
  

  
val uri = "mongodb://172.1.1.1:27017"
  

  val userDF = spark.sql("""
  
select
  uid,
  name,
  current_date() version
  from test_table
  limit 100
  """).repartition(8)
  

  
// Write to MongoDB
  userDF.write.mode("overwrite").format("com.mongodb.spark.sql").options(
  Map(
  "uri" -> uri,
  "database" -> "test",
  "collection" -> "test_table")).save()
  

  
// Read From MongoDB
  val df = spark.read.format("com.mongodb.spark.sql").options(
  Map(
  "uri" -> uri,
  "database" -> "test",
  "collection" -> "test_table")).load()
  

  
// 其他方式
  userDF.write.mode("overwrite").format("com.mongodb.spark.sql").options(
  Map(
  "spark.mongodb.input.uri" -> uri,
  "spark.mongodb.output.uri" -> uri,
  "spark.mongodb.output.database" -> "test",
  "spark.mongodb.output.collection" -> "test_table")).save()
  

  MongoSpark.save(
  userDF.write.mode("overwrite").options(
  Map(
  "spark.mongodb.input.uri" -> uri,
  "spark.mongodb.output.uri" -> uri,
  "spark.mongodb.output.database" -> "test",
  "spark.mongodb.output.collection" -> "test_table")))
  

  MongoSpark.save(
  userDF.write.mode("overwrite").options(
  Map(
  "uri" -> uri,
  "database" -> "test",
  "collection" -> "test_table")))
  

  spark.stop()
  
页: [1]
查看完整版本: Spark连接MongoDB之Scala