flume+kafka+sparkstreaming搭建整合
主要是数据从flume进去kafka,然后交给sparkstreaming处理的流程本文依旧以单词计数例子为例
首先,flume使用1.6版本,如果是1.6以下的话,没带有官方的kafkasink,需要自己实现,自己实现也不难实现,写一个自定义的sink,在里面方法调用kafka生产者代码,把数据发送到指定的kafka的broker的topic即可。
此处使用1.6版本,直接使用kafkaSink即可
agent4.channels.ch1.type = memory
agent4.sources.avro-source1.channels = ch1
agent4.sources.avro-source1.type = avro
agent4.sources.avro-source1.bind = 0.0.0.0
agent4.sources.avro-source1.port = 41414
agent4.sinks.log-sink1.channel = ch1
agent4.sinks.log-sink1.type = org.apache.flume.sink.kafka.KafkaSink
agent4.sinks.log-sink1.topic = test
agent4.sinks.log-sink1.brokerList = localhost:9092
agent4.sinks.log-sink1.requiredAcks = 1
agent4.sinks.log-sink1.batchSize = 20
agent4.channels = ch1
agent4.sources = avro-source1
agent4.sinks = log-sink1
然后启动flume即可 bin/flume-ng agent --conf ./conf/ -f conf/agent4 -Dflume.root.logger=DEBUG,console -n agent4
开始进行kafka的操作,这里都是单机部署,包括flume和kafka
首先启动zookeeper服务bin/zookeeper-server-start.sh config/zookeeper.properties
然后启动Kafkabin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &
然后创建一个"test"的topic,一个分区一个副本bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
可以查看一下所有主题,验证一下bin/kafka-topics.sh --list --zookeeper localhost:2181
然后开始写spark部分的代码
首先加入kafka的maven依赖<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>1.2.1</version>
</dependency>
代码如下:package cn.han
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.flume._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kafka._
object MySparkkafka {
def main(args: Array): Unit = {
System.setProperty("hadoop.home.dir","E:\\hadoop-2.6.0")
val sc = new SparkContext("local", "Spark Streaming kafka Integration")
//创建StreamingContext,3秒一个批次
val ssc = new StreamingContext(sc, Seconds(4))
val kafkaParams: Map = Map("group.id" -> "terran")
val readParallelism = 5
val topics = Map("test" -> 1)
val sl=StorageLevel.MEMORY_ONLY
val kafkaStream=KafkaUtils.createStream(ssc,"192.168.5.220:2181","terran", topics, sl)
val rss2=kafkaStream.flatMap(x =>{
val by=x._2
val sb=by.split(" ")
sb
})
val rdd3=rss2.map(x=>(x,1))
val rdd4=rdd3.reduceByKey(_+_)
rdd4.print()
//开始运行
ssc.start()
//计算完毕退出
ssc.awaitTermination()
sc.stop()
}
}
还需要一个简单的log4j程序,向flume写入测试数据,上一篇博客已经引入,这里就不再赘述了。最终,执行spark代码即可。
页:
[1]