(安西) 发表于 2017-5-21 14:02:12

flume ng1.6 +kafka sink

  官方出了kafka sink 。晚上找的都配置不对。还是看源码搞定
  ###source
  a1.sources = sysSrc
  a1.sources.sysSrc.type=syslogudp
  a1.sources.sysSrc.bind=0.0.0.0
  a1.sources.sysSrc.port=3333
  a1.sources.sysSrc.channels=fileChannel
  #a1.sources.log4jSrc.type =
  #a1.sources.log4jSrc.channels = fileChannel
  #### channel
  a1.channels = fileChannel
  a1.channels.fileChannel.type =memory
  a1.channels.fileChannel.capacity=100
  #### sink
  a1.sinks = kafkaSink
  a1.sinks.kafkaSink.channel=fileChannel
  a1.sinks.kafkaSink.type=org.apache.flume.sink.kafka.KafkaSink
  a1.sinks.kafkaSink.brokerList=10.73.195.136:9092
  a1.sinks.kafkaSink.custom.partition.key=kafkaPartition
  a1.sinks.kafkaSink.topic=cmjTopic
  a1.sinks.kafkaSink.serializer.class=kafka.serializer.StringEncoder
  启动
  bin/flume-ng agent -c conf/ -f conf/a1.properties -Dflume.root.logger=DEBUG,console -n a1
  客户端控制台
  echo "aaaaaaaaa" |nc -u 127.0.0.1 3333
  agent 控制台看到
  2015-08-19 16:47:58,342 (SinkRunner-PollingRunner-DefaultSinkProcessor) {Event} cmjTopic : null : aaaaaaaaa
  打开kafkaConsumer
  bin/kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic cmjTopic --from-beginning
  看到
  aaaaaaa输出
页: [1]
查看完整版本: flume ng1.6 +kafka sink