设为首页 收藏本站
查看: 1339|回复: 0

[经验分享] kafka java代码的使用[Producer和Consumer]

[复制链接]

尚未签到

发表于 2017-5-23 18:09:23 | 显示全部楼层 |阅读模式
用java代码对kafka消息进行消费与发送,首先我们得引入相关jar包
 
maven:

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.1</version>
</dependency>
  gradle:

compile("org.apache.kafka:kafka_2.10:0.8.2.1")
 
  在新版本的kafka中(具体版本记不清楚了),添加了java代码实现的producer,consumer目前还是Scala的,之前的producer和consumer均是Scala编写的,在这里则介绍java版本的producer。
  另一点需要特别注意:
  当发送消息时我们不指定key时,producer将消息分发到各partition的机制是:
  Scala版本的producer:在你的producer启动的时候,随机获得一个partition,然后后面的消息都会发送到这个partition,也就是说,只要程序启动了,这个producer都会往同一个partition里发送消息
  java版本的producer会轮询每个partition,所以发送的会比较平均
  所以当使用Scala版本的producer时,尽量传入key,保证消息在partition的平均性
  下面是具体的代码:

package cn.qlt.study.kafka;
import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import org.apache.commons.lang.SerializationUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import cn.qlt.study.domain.User;
public class KafkaUtil {

private static KafkaProducer<String, byte[]> producer=null;
private static ConsumerConnector consumer=null;
static{
//生产者配置文件,具体配置可参考ProducerConfig类源码,或者参考官网介绍
Map<String,Object> config=new HashMap<String, Object>();
//kafka服务器地址
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.100.90:9092,192.168.100.91:9092");
//kafka消息序列化类 即将传入对象序列化为字节数组
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
//kafka消息key序列化类 若传入key的值,则根据该key的值进行hash散列计算出在哪个partition上
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
config.put(ProducerConfig.BATCH_SIZE_CONFIG, 1024*1024*5);
//往kafka服务器提交消息间隔时间,0则立即提交不等待
config.put(ProducerConfig.LINGER_MS_CONFIG,0);
//消费者配置文件
Properties props = new Properties();
//zookeeper地址
props.put("zookeeper.connect", "192.168.100.90:2181");
//组id
props.put("group.id", "123");
//自动提交消费情况间隔时间
props.put("auto.commit.interval.ms", "1000");
ConsumerConfig consumerConfig=new ConsumerConfig(props);
producer=new KafkaProducer<String,byte[]>(config);
consumer=Consumer.createJavaConsumerConnector(consumerConfig);
}
/**
*启动一个消费程序
* @param topic 要消费的topic名称
* @param handler 自己的处理逻辑的实现
* @param threadCount 消费线程数,该值应小于等于partition个数,多了也没用
*/
public static <T extends Serializable>void startConsumer(String topic,final MqMessageHandler<T> handler,int threadCount) throws Exception{
if(threadCount<1)
throw new Exception("处理消息线程数最少为1");
//设置处理消息线程数,线程数应小于等于partition数量,若线程数大于partition数量,则多余的线程则闲置,不会进行工作
//key:topic名称 value:线程数
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(threadCount));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
//声明一个线程池,用于消费各个partition
ExecutorService executor=Executors.newFixedThreadPool(threadCount);
//获取对应topic的消息队列
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
//为每一个partition分配一个线程去消费
for (final KafkaStream stream : streams) {
executor.execute(new Runnable() {
@Override
public void run() {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
//有信息则消费,无信息将会阻塞
while (it.hasNext()){
T message=null;
try {
//将字节码反序列化成相应的对象
byte[] bytes=it.next().message();
message = (T) SerializationUtils.deserialize(bytes);
} catch (Exception e) {
e.printStackTrace();
return;
}
//调用自己的业务逻辑
try {
handler.handle(message);
} catch (Exception e) {
e.printStackTrace();
}
}
}
});
}
}
/**
*发送消息,发送的对象必须是可序列化的
*/
public static Future<RecordMetadata> send(String topic,Serializable value) throws Exception{
try {
//将对象序列化称字节码
byte[] bytes=SerializationUtils.serialize(value);
Future<RecordMetadata> future=producer.send(new ProducerRecord<String,byte[]>(topic,bytes));
return future;
}catch(Exception e){
throw e;
}
}
//内部抽象类 用于实现自己的处理逻辑
public static abstract class MqMessageHandler<T extends Serializable>{
public abstract void handle(T message);
}

public static void main(String[] args) throws Exception {
//发送一个信息
send("test",new User("id","userName", "password"));
//为test启动一个消费者,启动后每次有消息则打印对象信息
KafkaUtil.startConsumer("test", new MqMessageHandler<User>() {
@Override
public void handle(User user) {
//实现自己的处理逻辑,这里只打印出消息
System.out.println(user.toString());
}
},2);
}
}

   
  相关配置解释:
  producer:
  1、producer的配置不需要zookeeper地址,会直接获取kafka的元数据,直接和broker进行通信
  2、ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG即value.serializer,kafka生产者与broker之间数据是以byte进行传递的,所以这个参数的意思是把我们传入对象转换成byte[]的类,一般使用org.apache.kafka.common.serialization.ByteArraySerializer即可,我们自己把对象序列化为byte[]
  3、ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG即key.serializer,首先说明下key值是干什么的,若我们指定了key的值,生产者则会根据该key进行hash散列计算出具体的partition。若不指定,则随机选择partition。一般情况下我们没必要指定该值。这个类与上面功能一样,即将key转换成byte[]
  4、ProducerConfig.LINGER_MS_CONFIG即linger.ms,为了减少请求次数提高吞吐率,这个参数为每次提交间隔的次数,若设置了该值,如1000,则意味着我们的消息可能不会马上提交到kafka服务器,需要等上1秒中,才会进行批量提交。我们可以适当的配置该值。0为不等待立刻提交。
  consumer:
  1、zookeeper.connect:zookeeper的地址,多个之间用,分割
  2、group.id:这个值可以随便写,但建议写点有意义的值,别随便写个123。kafka保证同一个组内的消息只会被消费一次,若需要重复消费消息,则可以配置不同的groupid。
  3、auto.commit.interval.ms:consumer自己会记录消费的偏移量,并定时往zookeeper上提交,该值即为提交时间间隔,若该值设置太大可能会出现重复消费的情况,如我们停止了某个consumer,但该consumer还未往zookeeper提交某段时间的消费记录,这导致我们下次启动该消费者的时候,它会从上次提交的偏移量进行消费,这就导致了某些数据的重复消费。
  注意:在杀死consumer进程后,应等一会儿再去重启,因为杀死consumer进程时,会删除zookeeper的一些临时节点,若我们马上重启的话,可能会在启动的时候那些节点还没删除掉,出现写不必要的错误

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-379959-1-1.html 上篇帖子: Kafka 测试环境宕机原因查询(一) 下篇帖子: Kafka学习之Replication tools之List Topic Tool
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表