|
import com.mongodb.hadoop.MongoOutputFormat
import org.apache.hadoop.conf.Configuration
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.bson.BasicBSONObject
import com.mongodb.{BasicDBObject}
import com.mongodb.hadoop.io.MongoUpdateWritable
/**
* Created by Administrator on 2016/8/23 0023.
*/
object TestMongoDB {
def main(args: Array[String]): Unit = {
System.setProperty("hadoop.home.dir", "E:/hadoop-2.4.1/")
val conf = new SparkConf().setAppName("macType").setMaster("local[10]")
val sc = new SparkContext(conf)
var data = sc.textFile("F:/aa.txt")
insertResultToMongoDB(data)
}
def insertResultToMongoDB(rdd1:RDD[String]): Unit ={
val config = new Configuration()
config.set("mongo.output.uri", "mongodb://localhost:27017/admin.zz") //输出表名
val rdd = rdd1.map((s: String) =>{
var bson = new BasicBSONObject()
var time = System.currentTimeMillis()
bson.put("_id",s)
bson.put("xxx", time.toString)
(s,new MongoUpdateWritable(
new BasicDBObject("_id", s), // Query
new BasicDBObject("$set", bson), // Update operation
true, // Upsert
false, // Update multiple documents
false
))
})
for(e <- rdd){
println(e._1+" " + e._2)
}
println("-------------------")
rdd.saveAsNewAPIHadoopFile(
"file:///this-is-completely-unused",
classOf[Object],
classOf[MongoUpdateWritable],
classOf[MongoOutputFormat[Object, MongoUpdateWritable]],
config)
}
}
|
|
|