sdfwe 发表于 2015-11-5 09:28:16

DStreamWordCount

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{StreamingContext, Seconds}
import org.apache.spark.{SparkContext, SparkConf}

/**
* Created by Administrator on 2015/10/19.
*/
object DStreamWordCount {
   def main (args: Array) {
   val conf=new SparkConf().setMaster("local").setAppName("wordcount")
   val sc=new SparkContext(conf);
   val ssc=new StreamingContext(sc,Seconds(10));

   //val lines=ssc.socketTextStream("localhost",8888,StorageLevel.MEMORY_AND_DISK_SER);
   val line= ssc.receiverStream(new CustomReceiver("E:\\backup\\test"))
   val wordCount=line.map(x=>(x,1)).reduceByKey(_ + _)
   wordCount.print()
   ssc.start()
   ssc.awaitTermination()
}

}





import java.io.File
import org.apache.spark.{ SparkConf, Logging }
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{ Seconds, StreamingContext }
import org.apache.spark.streaming.receiver.Receiver

class CustomReceiver(dir: String)
extends Receiver(StorageLevel.MEMORY_AND_DISK_2) with Logging {

def onStart() {
    // Start the thread that receives data over a connection
    new Thread("File Receiver") {
      override def run() { receive() }
    }.start()
}

def onStop() {
    // There is nothing much to do as the thread calling receive()
    // is designed to stop by itself isStopped() returns false
}

def recursiveListFiles(f: File): Array = {
    val these = f.listFiles
    these ++ these.filter(_.isDirectory).flatMap(recursiveListFiles)
}

private def receive() {

    for (f <- recursiveListFiles(new File(dir))) {

      val source = scala.io.Source.fromFile(f)
      val lines = source.getLines

      lines.foreach(str=>{
      str.split(",").foreach(strchar=>{
          store(strchar)
      })
      })

      source.close()
    }
}
}

页: [1]
查看完整版本: DStreamWordCount