设为首页 收藏本站
查看: 1230|回复: 0

[经验分享] Kafka(3)Latest Zookeeper and Kafka With Scala

[复制链接]

尚未签到

发表于 2017-4-19 11:39:29 | 显示全部楼层 |阅读模式
  Kafka(3)Latest Zookeeper and Kafka With Scala 

1. Install the latest Zookeeper
http://zookeeper.apache.org/doc/r3.4.5/

Download and get the latest file zookeeper-3.4.5.tar.gz
Soft link the directory
>sudo ln -s /Users/carl/tool/zookeeper-3.4.5 /opt/zookeeper-3.4.5
>sudo ln -s /opt/zookeeper-3.4.5 /opt/zookeeper

Add this to the system path
>vi ~/.profile
export PATH=/opt/zookeeper/bin:$PATH
>. ~/.profile

Put the default configuration file
>cp conf/zoo_sample.cfg conf/zoo.cfg

And start the server like this
>zkServer.sh start zoo.cfg

Use JPS to check if the server is running
>jps
1957
10014 QuorumPeerMain
2260
10050 Jps

Connecting with client
>zkCli.sh -server localhost:2181
zookeeper>help
zookeeper>quit

Stop the server
>zkServer.sh stop

There are status, restart, upgrade, start, stop...

Configure to Cluster as follow
zoo1.cfg
dataDir=/tmp/zookeeper/zoo1 
server.1=localhost:2888:3888
server.2=localhost:2889:3889
server.3=localhost:2890:3890
clientPort=2181 

zoo2.cfg
dataDir=/tmp/zookeeper/zoo2  
server.1=localhost:2888:3888
server.2=localhost:2889:3889
server.3=localhost:2890:3890
clientPort=2182

zoo3.cfg
dataDir=/tmp/zookeeper/zoo3  
server.1=localhost:2888:3888
server.2=localhost:2889:3889
server.3=localhost:2890:3890
clientPort=2183 

>vi /tmp/zookeeper/zoo1/myid
1
>vi /tmp/zookeeper/zoo2/myid
2
>vi /tmp/zookeeper/zoo3/myid
3

Start 3 nodes
>zkServer.sh start zoo1.cfg
>zkServer.sh start zoo2.cfg
>zkServer.sh start zoo3.cfg

>jps

Or use client to connect
>zkCli.sh -server localhost:2181

>zkServer.sh stop zoo1.cfg
>zkServer.sh stop zoo2.cfg
>zkServer.sh stop zoo3.cfg

2. Working with Latest kafka
Download the latest source package from here https://dist.apache.org/repos/dist/release/kafka/kafka-0.8.0-beta1-src.tgz

>mv kafka-0.8.0-beta1-src kafka-0.8.0-beta1
>cd /Users/carl/tool/kafka-0.8.0-beta1
>./sbt clean
>./sbt update
>./sbt package
>./sbt assembly-package-dependency

>sudo ln -s /Users/carl/tool/kafka-0.8.0-beta1 /opt/kafka-0.8.0-beta1
>sudo ln -s /opt/kafka-0.8.0-beta1 /opt/kafka

>sudo vi ~/.profile
export PATH=/opt/kafka/bin:$PATH 
>. ~/.profile

>kafka-server-start.sh config/server.properties

Start the zookeeper first.

Today is Feb-11-2014, the latest version is this one http://apache.osuosl.org/kafka/0.8.0/kafka-0.8.0-src.tgz

3. Find the Jar and Prepare the Scala Client 
>./sbt release-zip 
>./sbt publish-local
Find the dependencies from here
>cat /opt/kafka/core/build.sbt

Add these 2 lines
        "com.101tec"          %   "zkclient"                  % "0.3",
        "org.apache.kafka"    %   "kafka_2.8.0"               % "0.8.0-beta1"

Try to fetch the latest codes
>git clone https://github.com/apache/kafka.git
>git clone https://github.com/kevinwright/kafka.git kafka_2.10

Or try this command directly on the official codes
>./sbt "++2.9.2 package" 
>./sbt "++2.9.2 publish-local"

It is not working for 
>./sbt "++2.10.0 package"
So I change the build.sbt
>vi core/build.sbt
case "2.10.0"=> "org.scalatest" %  "scalatest_2.10" % "1.9.1" % "test"

Have other error. So try with the kafka_2.10 and switch to branch 0.8
>./sbt "++2.10.0 package"
>./sbt +package
 
Go back to the official git URL and try this command
>./sbt "++2.9.2 publish-local"

"org.apache.kafka"    %   "kafka_2.9.2"               % "0.8.0-beta1", 

Error Message
java.lang.ClassNotFoundException: scala.reflect.ClassManifest 

Solution:
I do not know how to fix this right now.

I will leave this problem for the future. Just record some codes I have wrote.

package com.sillycat.superduty.jobs.producer

import java.util.Properties

import kafka.javaapi.producer.Producer
import kafka.producer.{ KeyedMessage, ProducerConfig }

object NewTaskKafka extends App {
  val props2: Properties = new Properties()
  props2.put("zk.connect", "localhost:2181")
  props2.put("metadata.broker.list", "localhost:9092");
  props2.put("serializer.class", "kafka.serializer.StringEncoder")
  props2.put("zk.connectiontimeout.ms", "15000")

  val config: ProducerConfig = new ProducerConfig(props2)

  val producer: Producer[String, String] = new Producer[String, String](config)

  val data = new KeyedMessage[String, String]("campaign", "test-message, it is ok")
  producer.send(data)
  producer.close
}

package com.sillycat.superduty.jobs.consumer

import kafka.api.{ FetchRequestBuilder, FetchRequest }
import kafka.javaapi.consumer.SimpleConsumer
import kafka.javaapi.FetchResponse
import kafka.javaapi.message.ByteBufferMessageSet
import scala.collection.JavaConversions._
import java.nio.ByteBuffer
import com.typesafe.scalalogging.slf4j.Logging

object WorkerKafka extends App with Logging {
  val simpleConsumer: SimpleConsumer = new SimpleConsumer("localhost", 9092, 10000, 1024000, "worker")

  val req: FetchRequest = new FetchRequestBuilder()
    .clientId("worker")
    .addFetch("test", 0, 0L, 100)
    .build()

  while (true) {
    val fetchResponse: FetchResponse = simpleConsumer.fetch(req)
    val messages: ByteBufferMessageSet = fetchResponse.messageSet("test", 0)

    messages foreach { msg =>
      val buffer: ByteBuffer = msg.message.payload
      val messages = new Array[Byte](buffer.remaining())
      val bytes = ByteBuffer.wrap(messages)
      logger.debug("message=" + bytes.toString)
    }
  }

}

Solution:
Same codes. try to update the package as follow.
"org.apache.kafka"          %   "kafka_2.10"            % "0.8.0" intransitive(),
"com.yammer.metrics"        %   "metrics-core"          % "2.2.0",

Since I am using the latest version of kafka, I need to configure the auto create topic or manually create topic.
I prefer manually.
Create topic
>bin/kafka-create-topic.sh --zookeeper localhost:2181 --replica 1 --partition 1 --topic test

List topic
>bin/kafka-list-topic.sh --zookeeper localhost:2181

Send and Consumer the message
>bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

That is cool.
Later, I will try the scala client and multiple nodes.

References:
Kafka 1~2
http://sillycat.iteye.com/blog/1563312
http://sillycat.iteye.com/blog/1563314

http://kafka.apache.org/

kafka cluster
http://www.jonzobrist.com/2012/04/17/install-apache-kafka-and-zookeeper-on-ubuntu-10-04/
http://www.michael-noll.com/blog/2013/03/13/running-a-multi-broker-apache-kafka-cluster-on-a-single-node/
http://kafka.apache.org/08/quickstart.html

zookeeper
http://sillycat.iteye.com/blog/1556108
http://sillycat.iteye.com/blog/1556141
http://rdc.taobao.com/team/jm/archives/665
http://blog.javachen.com/hadoop/2013/08/23/publish-proerties-using-zookeeper/
http://rdc.taobao.com/team/jm/archives/tag/zookeeper
https://github.com/alibaba/taokeeper

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-366358-1-1.html 上篇帖子: ELK+Filebeat+Nginx集中式日志解决方案(三)—— 添加kafka+zook... 下篇帖子: Spark Streaming(1)Spark Streaming Concept and Zookeeper/Kafka on Local
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表