设为首页 收藏本站
查看: 1182|回复: 0

[经验分享] kafka之生产、消费关系

[复制链接]

尚未签到

发表于 2017-5-23 18:02:05 | 显示全部楼层 |阅读模式
  直入主题:Kafka是一个消息系统,通过消费端订阅生产端,从而消费所需的数据。
  问题的产生原因是生成端发送大量数据,但是海量的数据只对应一个topic,且对这个topic开辟多个分区并未成功发送数据,因此自己测试了生成端发送数据至一个topic,十个分区。生产方发送数据至一个toppic,十个分区中,消费端采用十个线程采集这一个topic与十个分区的数据(建议数据量大的数据可以采用创建多个topic并每个topic对应多个分区,这个可以大大提高采集数据的效率)。结果代码如下,可直接粘贴运行:
  生成端模拟发送数据至一个topic并创建十个分区代码(在创建一个topic多个分区的方法之下有创建一个topic一个分区的默认的方法,这里使用的上面那个方法,一个topic对应十个分区且每个分区中发送6条数据):

public KafkaProducer() {
Properties props = new Properties();
props.put("zookeeper.connect", "xxxx:2181,xxxx:2181,xxxx:2181");
//props.put("zookeeper.connect", "localhost:2181");
// 指定序列化处理类,默认为kafka.serializer.DefaultEncoder,即byte[]
props.put("serializer.class", "kafka.serializer.StringEncoder");
// 同步还是异步,默认2表同步,1表异步。异步可以提高发送吞吐量,但是也可能导致丢失未发送过去的消息
props.put("producer.type", "sync");
// 是否压缩,默认0表示不压缩,1表示用gzip压缩,2表示用snappy压缩。压缩后消息中会有头来指明消息压缩类型,故在消费者端消息解压是透明的无需指定。
props.put("compression.codec", "1");
// 指定kafka节点列表,用于获取metadata(元数据),不必全部指定
props.put("metadata.broker.list", "lognn1te:6667,lognn2te:6667,logrmte:6667");
config = new ProducerConfig(props);
}
@Override
public void run() {
producer = new Producer<String, String>(config);
for(int i = 1; i <= 5; i++){ //5个分区
List<KeyedMessage<String, String>> messageList = new ArrayList<KeyedMessage<String, String>>();
for(int j = 0; j < 6; j++){ //每个分区6条讯息
//针对topic创建相应分区数并发送数据
messageList.add(new KeyedMessage<String, String>("wujun", "我是分区名称partition[" + i + "]", "我是发送的内容message[The " + i + " message]"));
producer.send(messageList);
}
}
//针对topic创建一个分区并发送数据
//List<KeyedMessage<String, String>> messageList = new ArrayList<KeyedMessage<String, String>>();
//  for(int i = 1; i <= 10; i++){
//messageList.add(new KeyedMessage<String, String>("wj",  "我是发送的内容message"+i));
//}
//producer.send(messageList);
//producer.close();
//}
}
public static void main(String[] args) {
Thread t = new Thread(new KafkaProducer());
t.start();
}
}
  以上是生成端的代码,发送10条数据至一个topic(wj)十个分区中
  消费端代码:

public class testKafka implements Runnable {
public void run() {
//所要开辟的线程数量
final int a_numThreads = 5;
//对应的kafka采集地址(本地测试的可以写为localhost:2181),对个地址用逗号隔开
String zk = "lognn1te:2181,lognn2te:2181,logrmte:2181";
//topic,与发送端生成的topic名称一致
String topic = "wj";
//groupid,生成端从新生成一个topic,消费端消费时最好变动groupid
String groupId = "test";
Properties props = new Properties();
props.put("zookeeper.connect", zk);
props.put("zookeeper.connectiontimeout.ms", "30000");
props.put("group.id", groupId);
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(a_numThreads));
ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
//所要开辟的线程数量
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
ExecutorService executor = Executors.newFixedThreadPool(a_numThreads);
int threadNumber = 0;
for (final KafkaStream stream : streams) {
//使用相应数量的线程数量采集数据
executor.submit(new KafkaConsumerThread(stream, threadNumber));
//查看消费对应的线程
threadNumber++;
}
}
public static void main(String[] args) {
Thread t = new Thread(new testKafka());
t.start();
}
}

public class KafkaConsumerThread implements Runnable {
private KafkaStream m_stream;
private int m_threadNumber;
public KafkaConsumerThread(KafkaStream a_stream, int a_threadNumber) {
m_threadNumber = a_threadNumber;
m_stream = a_stream;
}
public void run() {
ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
while (it.hasNext()){
//        System.out.println(Thread.currentThread().getName() + ":" +"partition:["+ mam.partition() +"]"+ "," + new String(mam.message()));
System.out.println("使用线程:" + m_threadNumber + "   发送的内容:" + new String(it.next().message()));
//        System.out.println("Shutting down Thread: " + m_threadNumber);
}
}
}
  以上两个类是消费端的代码,消费端使用了5个线程去采集该topic十个分区中的数据,以下是测试的结果:
  打印结果:

使用线程:3   发送的内容:我是发送的内容message[The 0 message]
使用线程:3   发送的内容:我是发送的内容message[The 0 message]
使用线程:3   发送的内容:我是发送的内容message[The 1 message]
使用线程:3   发送的内容:我是发送的内容message[The 0 message]
使用线程:3   发送的内容:我是发送的内容message[The 1 message]
使用线程:3   发送的内容:我是发送的内容message[The 2 message]
使用线程:3   发送的内容:我是发送的内容message[The 0 message]
使用线程:3   发送的内容:我是发送的内容message[The 1 message]
使用线程:3   发送的内容:我是发送的内容message[The 2 message]
使用线程:3   发送的内容:我是发送的内容message[The 3 message]
使用线程:3   发送的内容:我是发送的内容message[The 0 message]
使用线程:3   发送的内容:我是发送的内容message[The 1 message]
使用线程:3   发送的内容:我是发送的内容message[The 2 message]
使用线程:3   发送的内容:我是发送的内容message[The 3 message]
使用线程:3   发送的内容:我是发送的内容message[The 4 message]
使用线程:4   发送的内容:我是发送的内容message[The 0 message]
使用线程:3   发送的内容:我是发送的内容message[The 0 message]
使用线程:3   发送的内容:我是发送的内容message[The 0 message]
使用线程:3   发送的内容:我是发送的内容message[The 1 message]
使用线程:3   发送的内容:我是发送的内容message[The 0 message]
使用线程:3   发送的内容:我是发送的内容message[The 1 message]
使用线程:3   发送的内容:我是发送的内容message[The 2 message]
使用线程:3   发送的内容:我是发送的内容message[The 0 message]
使用线程:3   发送的内容:我是发送的内容message[The 1 message]
使用线程:3   发送的内容:我是发送的内容message[The 2 message]
使用线程:3   发送的内容:我是发送的内容message[The 3 message]
使用线程:3   发送的内容:我是发送的内容message[The 0 message]
使用线程:3   发送的内容:我是发送的内容message[The 1 message]
使用线程:3   发送的内容:我是发送的内容message[The 2 message]
使用线程:3   发送的内容:我是发送的内容message[The 3 message]
使用线程:3   发送的内容:我是发送的内容message[The 4 message]
使用线程:4   发送的内容:我是发送的内容message[The 0 message]
使用线程:4   发送的内容:我是发送的内容message[The 1 message]
使用线程:4   发送的内容:我是发送的内容message[The 0 message]
使用线程:4   发送的内容:我是发送的内容message[The 1 message]
使用线程:4   发送的内容:我是发送的内容message[The 2 message]
使用线程:4   发送的内容:我是发送的内容message[The 0 message]
使用线程:4   发送的内容:我是发送的内容message[The 1 message]
使用线程:4   发送的内容:我是发送的内容message[The 2 message]
使用线程:4   发送的内容:我是发送的内容message[The 3 message]
使用线程:4   发送的内容:我是发送的内容message[The 0 message]
使用线程:4   发送的内容:我是发送的内容message[The 1 message]
使用线程:4   发送的内容:我是发送的内容message[The 2 message]
使用线程:4   发送的内容:我是发送的内容message[The 3 message]
使用线程:4   发送的内容:我是发送的内容message[The 4 message]
使用线程:4   发送的内容:我是发送的内容message[The 0 message]
使用线程:1   发送的内容:我是发送的内容message[The 0 message]
使用线程:1   发送的内容:我是发送的内容message[The 0 message]
使用线程:1   发送的内容:我是发送的内容message[The 1 message]
使用线程:1   发送的内容:我是发送的内容message[The 0 message]
使用线程:1   发送的内容:我是发送的内容message[The 1 message]
使用线程:1   发送的内容:我是发送的内容message[The 2 message]
使用线程:4   发送的内容:我是发送的内容message[The 0 message]
使用线程:4   发送的内容:我是发送的内容message[The 1 message]
使用线程:4   发送的内容:我是发送的内容message[The 0 message]
使用线程:4   发送的内容:我是发送的内容message[The 1 message]
使用线程:4   发送的内容:我是发送的内容message[The 2 message]
使用线程:4   发送的内容:我是发送的内容message[The 0 message]
使用线程:4   发送的内容:我是发送的内容message[The 1 message]
使用线程:4   发送的内容:我是发送的内容message[The 2 message]
使用线程:4   发送的内容:我是发送的内容message[The 3 message]
使用线程:4   发送的内容:我是发送的内容message[The 0 message]
使用线程:4   发送的内容:我是发送的内容message[The 1 message]
使用线程:4   发送的内容:我是发送的内容message[The 2 message]
使用线程:4   发送的内容:我是发送的内容message[The 3 message]
使用线程:4   发送的内容:我是发送的内容message[The 4 message]
使用线程:1   发送的内容:我是发送的内容message[The 0 message]
使用线程:1   发送的内容:我是发送的内容message[The 0 message]
使用线程:1   发送的内容:我是发送的内容message[The 1 message]
使用线程:1   发送的内容:我是发送的内容message[The 2 message]
使用线程:1   发送的内容:我是发送的内容message[The 3 message]
使用线程:1   发送的内容:我是发送的内容message[The 0 message]
使用线程:1   发送的内容:我是发送的内容message[The 1 message]
使用线程:1   发送的内容:我是发送的内容message[The 2 message]
使用线程:1   发送的内容:我是发送的内容message[The 3 message]
使用线程:1   发送的内容:我是发送的内容message[The 4 message]
使用线程:0   发送的内容:我是发送的内容message[The 0 message]
使用线程:2   发送的内容:我是发送的内容message[The 0 message]
使用线程:2   发送的内容:我是发送的内容message[The 0 message]
使用线程:2   发送的内容:我是发送的内容message[The 1 message]
使用线程:2   发送的内容:我是发送的内容message[The 0 message]
使用线程:2   发送的内容:我是发送的内容message[The 1 message]
使用线程:2   发送的内容:我是发送的内容message[The 2 message]
使用线程:2   发送的内容:我是发送的内容message[The 0 message]
使用线程:2   发送的内容:我是发送的内容message[The 1 message]
使用线程:2   发送的内容:我是发送的内容message[The 2 message]
使用线程:2   发送的内容:我是发送的内容message[The 3 message]
使用线程:2   发送的内容:我是发送的内容message[The 0 message]
使用线程:2   发送的内容:我是发送的内容message[The 1 message]
使用线程:2   发送的内容:我是发送的内容message[The 2 message]
使用线程:2   发送的内容:我是发送的内容message[The 3 message]
使用线程:2   发送的内容:我是发送的内容message[The 4 message]
使用线程:1   发送的内容:我是发送的内容message[The 0 message]
使用线程:1   发送的内容:我是发送的内容message[The 1 message]
使用线程:1   发送的内容:我是发送的内容message[The 0 message]
使用线程:1   发送的内容:我是发送的内容message[The 1 message]
使用线程:1   发送的内容:我是发送的内容message[The 2 message]
使用线程:1   发送的内容:我是发送的内容message[The 0 message]
使用线程:1   发送的内容:我是发送的内容message[The 1 message]
使用线程:1   发送的内容:我是发送的内容message[The 2 message]
使用线程:1   发送的内容:我是发送的内容message[The 3 message]
使用线程:1   发送的内容:我是发送的内容message[The 0 message]
使用线程:1   发送的内容:我是发送的内容message[The 1 message]
使用线程:1   发送的内容:我是发送的内容message[The 2 message]
使用线程:1   发送的内容:我是发送的内容message[The 3 message]
使用线程:1   发送的内容:我是发送的内容message[The 4 message]
使用线程:0   发送的内容:我是发送的内容message[The 0 message]
使用线程:0   发送的内容:我是发送的内容message[The 0 message]
使用线程:0   发送的内容:我是发送的内容message[The 1 message]
使用线程:0   发送的内容:我是发送的内容message[The 0 message]
使用线程:0   发送的内容:我是发送的内容message[The 1 message]
使用线程:0   发送的内容:我是发送的内容message[The 2 message]
使用线程:0   发送的内容:我是发送的内容message[The 0 message]
使用线程:0   发送的内容:我是发送的内容message[The 1 message]
使用线程:0   发送的内容:我是发送的内容message[The 2 message]
使用线程:0   发送的内容:我是发送的内容message[The 3 message]
使用线程:0   发送的内容:我是发送的内容message[The 0 message]
使用线程:0   发送的内容:我是发送的内容message[The 1 message]
使用线程:0   发送的内容:我是发送的内容message[The 2 message]
使用线程:0   发送的内容:我是发送的内容message[The 3 message]
使用线程:0   发送的内容:我是发送的内容message[The 4 message]
使用线程:0   发送的内容:我是发送的内容message[The 0 message]
使用线程:0   发送的内容:我是发送的内容message[The 1 message]
使用线程:0   发送的内容:我是发送的内容message[The 0 message]
使用线程:0   发送的内容:我是发送的内容message[The 1 message]
使用线程:0   发送的内容:我是发送的内容message[The 2 message]
使用线程:0   发送的内容:我是发送的内容message[The 0 message]
使用线程:0   发送的内容:我是发送的内容message[The 1 message]
使用线程:0   发送的内容:我是发送的内容message[The 2 message]
使用线程:0   发送的内容:我是发送的内容message[The 3 message]
使用线程:0   发送的内容:我是发送的内容message[The 0 message]
使用线程:0   发送的内容:我是发送的内容message[The 1 message]
使用线程:0   发送的内容:我是发送的内容message[The 2 message]
使用线程:0   发送的内容:我是发送的内容message[The 3 message]
使用线程:0   发送的内容:我是发送的内容message[The 4 message]
使用线程:0   发送的内容:我是发送的内容message[The 0 message]
使用线程:0   发送的内容:我是发送的内容message[The 0 message]
使用线程:0   发送的内容:我是发送的内容message[The 1 message]
使用线程:0   发送的内容:我是发送的内容message[The 0 message]
使用线程:0   发送的内容:我是发送的内容message[The 1 message]
使用线程:0   发送的内容:我是发送的内容message[The 2 message]
使用线程:0   发送的内容:我是发送的内容message[The 0 message]
使用线程:0   发送的内容:我是发送的内容message[The 1 message]
使用线程:0   发送的内容:我是发送的内容message[The 2 message]
使用线程:0   发送的内容:我是发送的内容message[The 3 message]
使用线程:0   发送的内容:我是发送的内容message[The 0 message]
使用线程:0   发送的内容:我是发送的内容message[The 1 message]
使用线程:0   发送的内容:我是发送的内容message[The 2 message]
使用线程:0   发送的内容:我是发送的内容message[The 3 message]
使用线程:0   发送的内容:我是发送的内容message[The 4 message]
  从以上的打印结果可以清楚的说明结果:
  1.当使用的分区数大于开辟的线程数,消费端消费数据时会有一个线程同时采集1个以上分区的数据(不会出现一个分区对应多个线程的情况,这样采集数据会重复混乱)当某个分区中的数据较少时,采集的线程快速的采完了该分区的数据,处于空闲的状态则有可能从新分给别的分区进行采集任务。
  2.当使用的分区数等于或者小于线程数且每个分区数据量比较大时,这样就会一个线程对应采集一个分区中的数据,开辟多余的线程数处于闲置状态。
  以上是消费端,生产端对应topic与分区数量所采用的线程大小采集问题总结。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-379951-1-1.html 上篇帖子: 【Spark五十三】Spark Streaming整合Kafka 下篇帖子: Kafka学习之源代码环境搭建(eclipse)
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表