设为首页 收藏本站
查看: 1311|回复: 0

[经验分享] Kafka(2)Install ubuntu and Try more JAVA client

[复制链接]

尚未签到

发表于 2016-4-27 12:05:47 | 显示全部楼层 |阅读模式
Kafka(2)Install ubuntu and Try more JAVA client

1. Try to setup this on windows.
download and install this file
http://scalasbt.artifactoryonline.com/scalasbt/sbt-native-packages/org/scala-sbt/sbt-launcher/0.11.3/sbt.msi

Unzip the kafka to working directory:
D:\tool\kafka-0.7.0
>sbt update
>sbt package

sbt is installed on windows, but still, it is hard to install kafka on windows

2. Try to setup on ubuntu12.04
>wget http://people.apache.org/~nehanarkhede/kafka-0.7.0-incubating/kafka-0.7.0-incubating-src.tar.gz
>tar zxvf kafka-0.7.0-incubating-src.tar.gz
>mv kafka-0.7.0-incubating-src /opt/tools/kafka-0.7.0
>cd /opt/tools/kafka-0.7.0
>./sbt update
>./sbt package

start the server
>bin/zookeeper-server-start.sh config/zookeeper.properties
>bin/kafka-server-start.sh config/server.properties


3. Fix the Java Client Problem
Error Message:
[2012-06-11 17:55:00,109] WARN Exception causing close of session 0x137daf68ab70001 due to java.io.IOException: Connection reset by peer (org.apache.zookeeper.server.NIOServerCnxn)
[2012-06-11 17:55:00,110] INFO Closed socket connection for client /192.168.56.1:62003 which had sessionid 0x137daf68ab70001 (org.apache.zookeeper.server.NIOServerCnxn)

Solution:
server.properties
# Hostname the broker will advertise to consumers. If not set, kafka will use the value returned
# from InetAddress.getLocalHost().  If there are multiple interfaces getLocalHost
# may not be what you want.
hostname=x.x.x.x

#zk.connect=localhost:2181
zk.connect=x.x.x.x:2181

# Timeout in ms for connecting to zookeeper
zk.connectiontimeout.ms=1000000
zk.sessiontimeout.ms=60000

zookeeper.properties
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
tickTime=8000

We need to use real ip address here in configuration.

The Java Client sample codes are under this directory: D:\book\distributed\kafka-0.7.0-incubating-src\examples\src\main\java\kafka\examples

The class will be as follow:
package com.sillycat.magicneptune.example;

import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.javaapi.producer.ProducerData;
import kafka.producer.ProducerConfig;

public class TestProducerMain {

public static void main(String[] args) {
Properties props2 = new Properties();
props2.put("zk.connect", "192.168.56.101:2181");
props2.put("serializer.class", "kafka.serializer.StringEncoder");
// This is added by myself for changing the default timeout 6000.
props2.put("zk.connectiontimeout.ms", "15000");
ProducerConfig config = new ProducerConfig(props2);
Producer<String, String> producer = new Producer<String, String>(config);

// The message is sent to a randomly selected partition registered in ZK
ProducerData<String, String> data = new ProducerData<String, String>(
"test", "test-message,it is ok now.adsfasdf1111222");
producer.send(data);
producer.close();
}
}

package com.sillycat.magicneptune.example;

import java.net.InetAddress;
import java.net.UnknownHostException;

import kafka.api.FetchRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;

public class TestConsumerMain {

public static void main(String[] args) {


try {
System.out.println(InetAddress.getLocalHost().getHostAddress());
} catch (UnknownHostException e) {
e.printStackTrace();
}
SimpleConsumer consumer = new SimpleConsumer("192.168.56.101", 9092, 10000,
1024000);

long offset = 0;
while (true) {
// create a fetch request for topic   test  , partition 0, current
// offset, and fetch size of 1MB
FetchRequest fetchRequest = new FetchRequest("test", 0, offset,
1000000);

// get the message set from the consumer and print them out
ByteBufferMessageSet messages = consumer.fetch(fetchRequest);
for (MessageAndOffset msg : messages) {
System.out.println(ExampleUtils.getMessage(msg.message()) + "offset=" + offset);
// advance the offset after consuming each message
offset = msg.offset();
}
}
//consumer.close();
}

}


package com.sillycat.magicneptune.example;

import java.nio.ByteBuffer;

import kafka.message.Message;

public class ExampleUtils
{
  public static String getMessage(Message message)
  {
    ByteBuffer buffer = message.payload();
    byte [] bytes = new byte[buffer.remaining()];
    buffer.get(bytes);
    return new String(bytes);
  }
}


references:
http://www.jonzobrist.com/2012/04/17/install-apache-kafka-and-zookeeper-on-ubuntu-10-04/
https://github.com/harrah/xsbt/wiki/Getting-Started-Setup
http://incubator.apache.org/kafka/faq.html
http://incubator.apache.org/kafka/quickstart.html
http://blog.sina.com.cn/s/blog_3fe961ae01011o4z.html
http://incubator.apache.org/kafka/faq.html

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-209586-1-1.html 上篇帖子: 详解Kafka生产者Producer配置 下篇帖子: Kafka消费者Consumer常用配置
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表