设为首页 收藏本站
查看: 740|回复: 0

[经验分享] Kafka学习笔记-Java简单操作

[复制链接]

尚未签到

发表于 2017-6-2 08:29:55 | 显示全部楼层 |阅读模式
  Maven依赖包:



[plain] view plain copy

  • <dependency>
  •         <groupId>org.apache.kafka</groupId>
  •         <artifactId>kafka-clients</artifactId>
  •         <version>0.8.2.1</version>
  • </dependency>

  • <dependency>
  •     <groupId>org.apache.kafka</groupId>
  •     <artifactId>kafka_2.11</artifactId>
  •     <version>0.8.2.1</version>
  • </dependency>
  
代码如下:



[java] view plain copy

  • import java.util.Properties;  

  • import org.apache.kafka.clients.producer.Callback;  
  • import org.apache.kafka.clients.producer.KafkaProducer;  
  • import org.apache.kafka.clients.producer.ProducerRecord;  
  • import org.apache.kafka.clients.producer.RecordMetadata;  
  • import org.slf4j.Logger;  
  • import org.slf4j.LoggerFactory;  

  • public class KafkaProducerTest {  

  •     private static final Logger LOG = LoggerFactory.getLogger(KafkaProducerTest.class);  

  •     private static Properties properties = null;  

  •     static {  
  •         properties = new Properties();  
  •         properties.put("bootstrap.servers", "centos.master:9092,centos.slave1:9092,centos.slave2:9092");  
  •         properties.put("producer.type", "sync");  
  •         properties.put("request.required.acks", "1");  
  •         properties.put("serializer.class", "kafka.serializer.DefaultEncoder");  
  •         properties.put("partitioner.class", "kafka.producer.DefaultPartitioner");  
  •         properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");  
  • //      properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");  
  •         properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");  
  • //      properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");  
  •     }

  •     public void produce() {  
  •         KafkaProducer<byte[], byte[]> kafkaProducer = new KafkaProducer<byte[],byte[]>(properties);  
  •         ProducerRecord<byte[],byte[]> kafkaRecord = new ProducerRecord<byte[],byte[]>(  
  •                 "test", "kkk".getBytes(), "vvv".getBytes());  
  •         kafkaProducer.send(kafkaRecord, new Callback() {  
  •             public void onCompletion(RecordMetadata metadata, Exception e) {  
  •                 if(null != e) {  
  •                     LOG.info("the offset of the send record is {}", metadata.offset());  
  •                     LOG.error(e.getMessage(), e);
  •                 }
  •                 LOG.info("complete!");  
  •             }
  •         });
  •         kafkaProducer.close();
  •     }

  •     public static void main(String[] args) {  
  •         KafkaProducerTest kafkaProducerTest = new KafkaProducerTest();  
  •         for (int i = 0; i < 10; i++) {  
  •             kafkaProducerTest.produce();
  •         }
  •     }
  • }



[java] view plain copy

  • import java.util.List;  
  • import java.util.Map;  
  • import java.util.Properties;  

  • import org.apache.kafka.clients.consumer.ConsumerConfig;  
  • import org.apache.kafka.clients.consumer.ConsumerRecord;  
  • import org.apache.kafka.clients.consumer.ConsumerRecords;  
  • import org.apache.kafka.clients.consumer.KafkaConsumer;  
  • import org.slf4j.Logger;  
  • import org.slf4j.LoggerFactory;  

  • public class KafkaConsumerTest {  

  •     private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerTest.class);  

  •     public static void main(String[] args) {  
  •         Properties properties = new Properties();  
  •         properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
  •                 "centos.master:9092,centos.slave1:9092,centos.slave2:9092");  
  •         properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");              
  •         properties.put(ConsumerConfig.SESSION_TIMEOUT_MS, "1000");              
  •         properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");  
  •         properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY, "range");  
  • //      properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY, "roundrobin");  
  •         properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "10000");   
  •         properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
  •                 "org.apache.kafka.common.serialization.ByteArrayDeserializer");  
  •         properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
  •                 "org.apache.kafka.common.serialization.ByteArrayDeserializer");  

  •         KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<byte[], byte[]>(properties);  
  •         kafkaConsumer.subscribe("test");  
  • //      kafkaConsumer.subscribe("*");  
  •         boolean isRunning = true;              
  •         while(isRunning) {  
  •             Map<String, ConsumerRecords<byte[], byte[]>> results = kafkaConsumer.poll(100);  
  •             if (null != results) {  
  •                 for (Map.Entry<String, ConsumerRecords<byte[], byte[]>> entry : results.entrySet()) {  
  •                     LOG.info("topic {}", entry.getKey());  
  •                     ConsumerRecords<byte[], byte[]> consumerRecords = entry.getValue();  
  •                     List<ConsumerRecord<byte[], byte[]>> records = consumerRecords.records();  
  •                     for (int i = 0, len = records.size(); i < len; i++) {  
  •                         ConsumerRecord<byte[], byte[]> consumerRecord = records.get(i);  
  •                         LOG.info("topic {} partition {}", consumerRecord.topic(), consumerRecord.partition());  
  •                         try {  
  •                             LOG.info("offset {} value {}", consumerRecord.offset(), new String(consumerRecord.value()));  
  •                         } catch (Exception e) {  
  •                             LOG.error(e.getMessage(), e);
  •                         }
  •                     }
  •                 }
  •             }
  •         }

  •         kafkaConsumer.close();

  •     }

  • }
  发现KafkaConsumer的poll方法未实现



[java] view plain copy

  • @Override  
  • public Map<String, ConsumerRecords<K,V>> poll(long timeout) {  
  •      // TODO Auto-generated method stub  
  •      return null;  
  • }
  
后改为kafka.javaapi.consumer.SimpleConsumer实现,正常运行



[java] view plain copy

    • import java.nio.ByteBuffer;  
    • import java.util.ArrayList;  
    • import java.util.Collections;  
    • import java.util.HashMap;  
    • import java.util.List;  
    • import java.util.Map;  

    • import kafka.api.FetchRequest;  
    • import kafka.api.FetchRequestBuilder;  
    • import kafka.api.PartitionOffsetRequestInfo;  
    • import kafka.cluster.Broker;  
    • import kafka.common.ErrorMapping;  
    • import kafka.common.TopicAndPartition;  
    • import kafka.javaapi.FetchResponse;  
    • import kafka.javaapi.OffsetRequest;  
    • import kafka.javaapi.OffsetResponse;  
    • import kafka.javaapi.PartitionMetadata;  
    • import kafka.javaapi.TopicMetadata;  
    • import kafka.javaapi.TopicMetadataRequest;  
    • import kafka.javaapi.TopicMetadataResponse;  
    • import kafka.javaapi.consumer.SimpleConsumer;  
    • import kafka.message.MessageAndOffset;  

    • public class KafkaSimpleConsumerTest {  

    •     private List<String> borkerList = new ArrayList<String>();   

    •     public KafkaSimpleConsumerTest() {   
    •         borkerList = new ArrayList<String>();   
    •     }

    •     public static void main(String args[]) {   
    •         KafkaSimpleConsumerTest kafkaSimpleConsumer = new KafkaSimpleConsumerTest();   
    •         // 最大读取消息数量   
    •         long maxReadNum = Long.parseLong("3");   
    •         // 订阅的topic   
    •         String topic = "test";   
    •         // 查找的分区   
    •         int partition = Integer.parseInt("0");   
    •         // broker节点  
    •         List<String> seeds = new ArrayList<String>();   
    •         seeds.add("centos.master");   
    •         seeds.add("centos.slave1");   
    •         seeds.add("centos.slave2");   
    •         // 端口   
    •         int port = Integer.parseInt("9092");   
    •         try {   
    •             kafkaSimpleConsumer.run(maxReadNum, topic, partition, seeds, port);
    •         } catch (Exception e) {   
    •             System.out.println("Oops:" + e);   
    •             e.printStackTrace();
    •         }
    •     }

    •     public void run(long maxReadNum, String topic, int partition, List<String> seedBrokers, int port) throws Exception {   
    •         // 获取指定topic partition的元数据   
    •         PartitionMetadata metadata = findLeader(seedBrokers, port, topic, partition);
    •         if (metadata == null) {   
    •             System.out.println("can't find metadata for topic and partition. exit");   
    •             return;   
    •         }
    •         if (metadata.leader() == null) {   
    •             System.out.println("can't find leader for topic and partition. exit");   
    •             return;   
    •         }
    •         String leadBroker = metadata.leader().host();
    •         String clientName = "client_" + topic + "_" + partition;   

    •         SimpleConsumer consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName);   
    •         long readOffset = getLastOffset(consumer, topic, partition, kafka.api.OffsetRequest.EarliestTime(), clientName);   
    •         int numErrors = 0;   
    •         while (maxReadNum > 0) {   
    •             if (consumer == null) {   
    •                 consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName);   
    •             }
    •             FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(topic, partition, readOffset, 100000).build();   
    •             FetchResponse fetchResponse = consumer.fetch(req);

    •             if (fetchResponse.hasError()) {   
    •                 numErrors++;
    •                 short code = fetchResponse.errorCode(topic, partition);   
    •                 System.out.println("error fetching data from the broker:" + leadBroker + " reason: " + code);   
    •                 if (numErrors > 5)   
    •                     break;   
    •                 if (code == ErrorMapping.OffsetOutOfRangeCode()) {   
    •                     readOffset = getLastOffset(consumer, topic, partition, kafka.api.OffsetRequest.LatestTime(), clientName);
    •                     continue;   
    •                 }
    •                 consumer.close();
    •                 consumer = null;   
    •                 leadBroker = findNewLeader(leadBroker, topic, partition, port);
    •                 continue;   
    •             }
    •             numErrors = 0;   

    •             long numRead = 0;   
    •             for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partition)) {   
    •                 long currentOffset = messageAndOffset.offset();   
    •                 if (currentOffset < readOffset) {   
    •                     System.out.println("found an old offset: " + currentOffset + " expecting: " + readOffset);   
    •                     continue;   
    •                 }

    •                 readOffset = messageAndOffset.nextOffset();
    •                 ByteBuffer payload = messageAndOffset.message().payload();

    •                 byte[] bytes = new byte[payload.limit()];   
    •                 payload.get(bytes);
    •                 System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));   
    •                 numRead++;
    •                 maxReadNum--;
    •             }

    •             if (numRead == 0) {   
    •                 try {   
    •                     Thread.sleep(1000);   
    •                 } catch (InterruptedException ie) {   
    •                 }
    •             }
    •         }
    •         if (consumer != null)   
    •             consumer.close();
    •     }

    •     /**
    •      * 从活跃的Broker列表中找出指定Topic、Partition中的Leader Broker
    •      * @param seedBrokers
    •      * @param port
    •      * @param topic
    •      * @param partition
    •      * @return
    •      */  
    •     private PartitionMetadata findLeader(List<String> seedBrokers, int port, String topic, int partition) {   
    •         PartitionMetadata partitionMetadata = null;   
    •         loop: for (String seedBroker : seedBrokers) {   
    •             SimpleConsumer consumer = null;   
    •             try {   
    •                 consumer = new SimpleConsumer(seedBroker, port, 100000, 64 * 1024, "leaderLookup");   
    •                 List<String> topics = Collections.singletonList(topic);
    •                 TopicMetadataRequest topicMetadataRequest = new TopicMetadataRequest(topics);   
    •                 TopicMetadataResponse topicMetadataResponse = consumer.send(topicMetadataRequest);

    •                 List<TopicMetadata> topicMetadatas = topicMetadataResponse.topicsMetadata();
    •                 for (TopicMetadata topicMetadata : topicMetadatas) {   
    •                     for (PartitionMetadata pMetadata : topicMetadata.partitionsMetadata()) {   
    •                         if (pMetadata.partitionId() == partition) {   
    •                             partitionMetadata = pMetadata;
    •                             break loop;   
    •                         }
    •                     }
    •                 }
    •             } catch (Exception e) {   
    •                 System.out.println("error communicating with broker [" + seedBroker + "] to find leader for [" + topic + ", " + partition + "] reason: " + e);   
    •             } finally {   
    •                 if (consumer != null)   
    •                     consumer.close();
    •             }
    •         }
    •         if (partitionMetadata != null) {   
    •             borkerList.clear();
    •             for (Broker replica : partitionMetadata.replicas()) {   
    •                 borkerList.add(replica.host());
    •             }
    •         }
    •         return partitionMetadata;   
    •     }

    •     public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) {   
    •         TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);   
    •         Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();   
    •         requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));   
    •         OffsetRequest request = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);   
    •         OffsetResponse response = consumer.getOffsetsBefore(request);
    •         if (response.hasError()) {   
    •             System.out.println("error fetching data offset data the broker. reason: " + response.errorCode(topic, partition));   
    •             return 0;   
    •         }
    •         long[] offsets = response.offsets(topic, partition);   
    •         return offsets[0];   
    •     }

    •     private String findNewLeader(String oldLeader, String topic, int partition, int port) throws Exception {   
    •         for (int i = 0; i < 3; i++) {   
    •             boolean goToSleep = false;   
    •             PartitionMetadata metadata = findLeader(borkerList, port, topic, partition);
    •             if (metadata == null) {   
    •                 goToSleep = true;   
    •             } else if (metadata.leader() == null) {   
    •                 goToSleep = true;   
    •             } else if (oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {   
    •                 goToSleep = true;   
    •             } else {   
    •                 return metadata.leader().host();   
    •             }
    •             if (goToSleep) {   
    •                 try {   
    •                     Thread.sleep(1000);   
    •                 } catch (InterruptedException ie) {   
    •                 }
    •             }
    •         }
    •         System.out.println("unable to find new leader after broker failure. exit");   
    •         throw new Exception("unable to find new leader after broker failure. exit");   
    •     }

    • }



运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-382540-1-1.html 上篇帖子: CentOS 7部署Kafka和Kafka集群 下篇帖子: Kafka~服务端几个常用的命令
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表