设为首页 收藏本站
查看: 1077|回复: 0

[经验分享] kafka客户端API

[复制链接]
发表于 2019-1-31 10:19:17 | 显示全部楼层 |阅读模式
Kafka Producer APIs
  Procuder API有两种:kafka.producer.SyncProducer和kafka.producer.async.AsyncProducer.它们都实现了同一个接口:

  •   class Producer {

  •   /* 将消息发送到指定分区 */
  •   publicvoid send(kafka.javaapi.producer.ProducerData producerData);

  •   /* 批量发送一批消息 */
  •   publicvoid send(java.util.List producerData);

  •   /* 关闭producer */
  •   publicvoid close();

  •   }
  Producer API提供了以下功能:

  •   可以将多个消息缓存到本地队列里,然后异步的批量发送到broker,可以通过参数producer.type=async做到。缓存的大小可以通过一些参数指定:queue.time和batch.size。一个后台线程((kafka.producer.async.ProducerSendThread)从队列中取出数据并让kafka.producer.EventHandler将消息发送到broker,也可以通过参数event.handler定制handler,在producer端处理数据的不同的阶段注册处理器,比如可以对这一过程进行日志追踪,或进行一些监控。只需实现kafka.producer.async.CallbackHandler接口,并在callback.handler中配置。
  •   自己编写Encoder来序列化消息,只需实现下面这个接口。默认的Encoder是kafka.serializer.DefaultEncoder。

    •   interface Encoder {
    •   public Message toMessage(T data);
    •   }

  •   提供了基于Zookeeper的broker自动感知能力,可以通过参数zk.connect实现。如果不使用Zookeeper,也可以使用broker.list参数指定一个静态的brokers列表,这样消息将被随机的发送到一个broker上,一旦选中的broker失败了,消息发送也就失败了。
  •   通过分区函数kafka.producer.Partitioner类对消息分区。分区函数有两个参数:key和可用的分区数量,从分区列表中选择一个分区并返回id。默认的分区策略是hash(key)%numPartitions.如果key是null,就随机的选择一个。可以通过参数partitioner.class定制分区函数。

    •   interface Partitioner {
    •   int partition(T key, int numPartitions);
    •   }

KafKa Consumer APIs
  Consumer API有两个级别。低级别的和一个指定的broker保持连接,并在接收完消息后关闭连接,这个级别是无状态的,每次读取消息都带着offset。
  高级别的API隐藏了和brokers连接的细节,在不必关心服务端架构的情况下和服务端通信。还可以自己维护消费状态,并可以通过一些条件指定订阅特定的topic,比如白名单黑名单或者正则表达式。
低级别的API

  •   class SimpleConsumer {

  •   /*向一个broker发送读取请求并得到消息集 */
  •   public ByteBufferMessageSet fetch(FetchRequest request);

  •   /*向一个broker发送读取请求并得到一个相应集 */
  •   public MultiFetchResponse multifetch(List fetches);

  •   /**
  •   * 得到指定时间之前的offsets
  •   * 返回值是offsets列表,以倒序排序
  •   * @param time: 时间,毫秒,
  •   * 如果指定为OffsetRequest$.MODULE$.LATIEST_TIME(), 得到最新的offset.
  •   * 如果指定为OffsetRequest$.MODULE$.EARLIEST_TIME(),得到最老的offset.
  •   */
  •   publiclong[] getOffsetsBefore(String topic, int partition, long time, int maxNumOffsets);
  •   }
  低级别的API是高级别API实现的基础,也是为了一些对维持消费状态有特殊需求的场景,比如Hadoop consumer这样的离线consumer。
高级别的API

  •   /* 创建连接 */
  •   ConsumerConnector connector = Consumer.create(consumerConfig);

  •   interface ConsumerConnector {

  •   /**
  •   * 这个方法可以得到一个流的列表,每个流都是MessageAndMetadata的迭代,通过MessageAndMetadata可以拿到消息和其他的元数据(目前之后topic)
  •   * Input: a map of
  •   * Output: a map of
  •   */
  •   public Map createMessageStreams(Map topicCountMap);

  •   /**
  •   * 你也可以得到一个流的列表,它包含了符合TopicFiler的消息的迭代,
  •   * 一个TopicFilter是一个封装了白名单或黑名单的正则表达式。
  •   */
  •   public List createMessageStreamsByFilter(
  •   TopicFilter topicFilter, int numStreams);

  •   /* 提交目前消费到的offset */
  •   public commitOffsets()

  •   /* 关闭连接 */
  •   public shutdown()
  •   }
  这个API围绕着由KafkaStream实现的迭代器展开,每个流代表一系列从一个或多个分区多和broker上汇聚来的消息,每个流由一个线程处理,所以客户端可以在创建的时候通过参数指定想要几个流。一个流是多个分区多个broker的合并,但是每个分区的消息只会流向一个流。
  每调用一次createMessageStreams都会将consumer注册到topic上,这样consumer和brokers之间的负载均衡就会进行调整。API鼓励每次调用创建更多的topic流以减少这种调整。createMessageStreamsByFilter方法注册监听可以感知新的符合filter的tipic。


运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-669936-1-1.html 上篇帖子: Idea下Kafka源码阅读编译环境搭建 下篇帖子: Kafka无法消费?!我的分布式消息服务Kafka却稳如泰山!
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表