|
Kafka(3)Latest Zookeeper and Kafka With Scala
1. Install the latest Zookeeper
http://zookeeper.apache.org/doc/r3.4.5/
Download and get the latest file zookeeper-3.4.5.tar.gz
Soft link the directory
>sudo ln -s /Users/carl/tool/zookeeper-3.4.5 /opt/zookeeper-3.4.5
>sudo ln -s /opt/zookeeper-3.4.5 /opt/zookeeper
Add this to the system path
>vi ~/.profile
export PATH=/opt/zookeeper/bin:$PATH
>. ~/.profile
Put the default configuration file
>cp conf/zoo_sample.cfg conf/zoo.cfg
And start the server like this
>zkServer.sh start zoo.cfg
Use JPS to check if the server is running
>jps
1957
10014 QuorumPeerMain
2260
10050 Jps
Connecting with client
>zkCli.sh -server localhost:2181
zookeeper>help
zookeeper>quit
Stop the server
>zkServer.sh stop
There are status, restart, upgrade, start, stop...
Configure to Cluster as follow
zoo1.cfg
dataDir=/tmp/zookeeper/zoo1
server.1=localhost:2888:3888
server.2=localhost:2889:3889
server.3=localhost:2890:3890
clientPort=2181
zoo2.cfg
dataDir=/tmp/zookeeper/zoo2
server.1=localhost:2888:3888
server.2=localhost:2889:3889
server.3=localhost:2890:3890
clientPort=2182
zoo3.cfg
dataDir=/tmp/zookeeper/zoo3
server.1=localhost:2888:3888
server.2=localhost:2889:3889
server.3=localhost:2890:3890
clientPort=2183
>vi /tmp/zookeeper/zoo1/myid
1
>vi /tmp/zookeeper/zoo2/myid
2
>vi /tmp/zookeeper/zoo3/myid
3
Start 3 nodes
>zkServer.sh start zoo1.cfg
>zkServer.sh start zoo2.cfg
>zkServer.sh start zoo3.cfg
>jps
Or use client to connect
>zkCli.sh -server localhost:2181
>zkServer.sh stop zoo1.cfg
>zkServer.sh stop zoo2.cfg
>zkServer.sh stop zoo3.cfg
2. Working with Latest kafka
Download the latest source package from here https://dist.apache.org/repos/dist/release/kafka/kafka-0.8.0-beta1-src.tgz
>mv kafka-0.8.0-beta1-src kafka-0.8.0-beta1
>cd /Users/carl/tool/kafka-0.8.0-beta1
>./sbt clean
>./sbt update
>./sbt package
>./sbt assembly-package-dependency
>sudo ln -s /Users/carl/tool/kafka-0.8.0-beta1 /opt/kafka-0.8.0-beta1
>sudo ln -s /opt/kafka-0.8.0-beta1 /opt/kafka
>sudo vi ~/.profile
export PATH=/opt/kafka/bin:$PATH
>. ~/.profile
>kafka-server-start.sh config/server.properties
Start the zookeeper first.
Today is Feb-11-2014, the latest version is this one http://apache.osuosl.org/kafka/0.8.0/kafka-0.8.0-src.tgz
3. Find the Jar and Prepare the Scala Client
>./sbt release-zip
>./sbt publish-local
Find the dependencies from here
>cat /opt/kafka/core/build.sbt
Add these 2 lines
"com.101tec" % "zkclient" % "0.3",
"org.apache.kafka" % "kafka_2.8.0" % "0.8.0-beta1"
Try to fetch the latest codes
>git clone https://github.com/apache/kafka.git
>git clone https://github.com/kevinwright/kafka.git kafka_2.10
Or try this command directly on the official codes
>./sbt "++2.9.2 package"
>./sbt "++2.9.2 publish-local"
It is not working for
>./sbt "++2.10.0 package"
So I change the build.sbt
>vi core/build.sbt
case "2.10.0"=> "org.scalatest" % "scalatest_2.10" % "1.9.1" % "test"
Have other error. So try with the kafka_2.10 and switch to branch 0.8
>./sbt "++2.10.0 package"
>./sbt +package
Go back to the official git URL and try this command
>./sbt "++2.9.2 publish-local"
"org.apache.kafka" % "kafka_2.9.2" % "0.8.0-beta1",
Error Message
java.lang.ClassNotFoundException: scala.reflect.ClassManifest
Solution:
I do not know how to fix this right now.
I will leave this problem for the future. Just record some codes I have wrote.
package com.sillycat.superduty.jobs.producer
import java.util.Properties
import kafka.javaapi.producer.Producer
import kafka.producer.{ KeyedMessage, ProducerConfig }
object NewTaskKafka extends App {
val props2: Properties = new Properties()
props2.put("zk.connect", "localhost:2181")
props2.put("metadata.broker.list", "localhost:9092");
props2.put("serializer.class", "kafka.serializer.StringEncoder")
props2.put("zk.connectiontimeout.ms", "15000")
val config: ProducerConfig = new ProducerConfig(props2)
val producer: Producer[String, String] = new Producer[String, String](config)
val data = new KeyedMessage[String, String]("campaign", "test-message, it is ok")
producer.send(data)
producer.close
}
package com.sillycat.superduty.jobs.consumer
import kafka.api.{ FetchRequestBuilder, FetchRequest }
import kafka.javaapi.consumer.SimpleConsumer
import kafka.javaapi.FetchResponse
import kafka.javaapi.message.ByteBufferMessageSet
import scala.collection.JavaConversions._
import java.nio.ByteBuffer
import com.typesafe.scalalogging.slf4j.Logging
object WorkerKafka extends App with Logging {
val simpleConsumer: SimpleConsumer = new SimpleConsumer("localhost", 9092, 10000, 1024000, "worker")
val req: FetchRequest = new FetchRequestBuilder()
.clientId("worker")
.addFetch("test", 0, 0L, 100)
.build()
while (true) {
val fetchResponse: FetchResponse = simpleConsumer.fetch(req)
val messages: ByteBufferMessageSet = fetchResponse.messageSet("test", 0)
messages foreach { msg =>
val buffer: ByteBuffer = msg.message.payload
val messages = new Array[Byte](buffer.remaining())
val bytes = ByteBuffer.wrap(messages)
logger.debug("message=" + bytes.toString)
}
}
}
Solution:
Same codes. try to update the package as follow.
"org.apache.kafka" % "kafka_2.10" % "0.8.0" intransitive(),
"com.yammer.metrics" % "metrics-core" % "2.2.0",
Since I am using the latest version of kafka, I need to configure the auto create topic or manually create topic.
I prefer manually.
Create topic
>bin/kafka-create-topic.sh --zookeeper localhost:2181 --replica 1 --partition 1 --topic test
List topic
>bin/kafka-list-topic.sh --zookeeper localhost:2181
Send and Consumer the message
>bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
That is cool.
Later, I will try the scala client and multiple nodes.
References:
Kafka 1~2
http://sillycat.iteye.com/blog/1563312
http://sillycat.iteye.com/blog/1563314
http://kafka.apache.org/
kafka cluster
http://www.jonzobrist.com/2012/04/17/install-apache-kafka-and-zookeeper-on-ubuntu-10-04/
http://www.michael-noll.com/blog/2013/03/13/running-a-multi-broker-apache-kafka-cluster-on-a-single-node/
http://kafka.apache.org/08/quickstart.html
zookeeper
http://sillycat.iteye.com/blog/1556108
http://sillycat.iteye.com/blog/1556141
http://rdc.taobao.com/team/jm/archives/665
http://blog.javachen.com/hadoop/2013/08/23/publish-proerties-using-zookeeper/
http://rdc.taobao.com/team/jm/archives/tag/zookeeper
https://github.com/alibaba/taokeeper |
|