设为首页 收藏本站
查看: 1268|回复: 0

[经验分享] kafka 获取metadata

[复制链接]

尚未签到

发表于 2017-5-23 17:43:45 | 显示全部楼层 |阅读模式
  问题:
  <Failed to update metadata after 3000 ms.>
  sender类的发送数据时候,会
  List<ClientResponse> responses = this.client.poll(requests, pollTimeout, now);//nio 发送数据
  NetworkClient类,方法poll,检查metadata是否需要更新
  方法:

    /**
* Add a metadata request to the list of sends if we can make one
*/
private void maybeUpdateMetadata(List<NetworkSend> sends, long now) {
// Beware that the behavior of this method and the computation of timeouts for poll() are
// highly dependent on the behavior of leastLoadedNode.
// 最新的可用node
Node node = this.leastLoadedNode(now);
if (node == null) {
log.debug("Give up sending metadata request since no node is available");
// mark the timestamp for no node available to connect
this.lastNoNodeAvailableMs = now;
return;
}
log.debug("Trying to send metadata request to node {}", node.id());
if (connectionStates.isConnected(node.id()) && inFlightRequests.canSendMore(node.id())) {
Set<String> topics = metadata.topics();
this.metadataFetchInProgress = true;
//生成metadata请求,加入到sends队列中
ClientRequest metadataRequest = metadataRequest(now, node.id(), topics);
log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());
sends.add(metadataRequest.request());
this.inFlightRequests.add(metadataRequest);
} else if (connectionStates.canConnect(node.id(), now)) {
// we don't have a connection to this node right now, make one
log.debug("Init connection to node {} for sending metadata request in the next iteration", node.id());
initiateConnect(node, now);
// If initiateConnect failed immediately, this node will be put into blackout and we
// should allow immediately retrying in case there is another candidate node. If it
// is still connecting, the worst case is that we end up setting a longer timeout
// on the next round and then wait for the response.
} else { // connected, but can't send more OR connecting
// In either case, we just need to wait for a network tevent to let us know the seleced
// connection might be usable again.
this.lastNoNodeAvailableMs = now;
}
}
  选择一个请求最少,并且链接状态可用的host,作为获取metadata的host
  这里

/**
* Choose the node with the fewest outstanding requests which is at least eligible for connection. This method will
* prefer a node with an existing connection, but will potentially choose a node for which we don't yet have a
* connection if all existing connections are in use. This method will never choose a node for which there is no
* existing connection and from which we have disconnected within the reconnect backoff period.
* @return The node with the fewest in-flight requests.
*/
public Node leastLoadedNode(long now) {
List<Node> nodes = this.metadata.fetch().nodes();
int inflight = Integer.MAX_VALUE;
Node found = null;
for (int i = 0; i < nodes.size(); i++) {
int idx = Utils.abs((this.nodeIndexOffset + i) % nodes.size());
Node node = nodes.get(idx);
int currInflight = this.inFlightRequests.inFlightRequestCount(node.id());
if (currInflight == 0 && this.connectionStates.isConnected(node.id())) {
// if we find an established connection with no in-flight requests we can stop right away
return node;
} else if (!this.connectionStates.isBlackedOut(node.id(), now) && currInflight < inflight) {
// otherwise if this is the best we have found so far, record that
inflight = currInflight;
found = node;
}
}
return found;
}

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-379928-1-1.html 上篇帖子: kafka TopicConfigManager类 下篇帖子: kafka学习笔记1之简介
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表