问题:
<Failed to update metadata after 3000 ms.>
sender类的发送数据时候,会
List<ClientResponse> responses = this.client.poll(requests, pollTimeout, now);//nio 发送数据
NetworkClient类,方法poll,检查metadata是否需要更新
方法:
/**
* Add a metadata request to the list of sends if we can make one
*/
private void maybeUpdateMetadata(List<NetworkSend> sends, long now) {
// Beware that the behavior of this method and the computation of timeouts for poll() are
// highly dependent on the behavior of leastLoadedNode.
// 最新的可用node
Node node = this.leastLoadedNode(now);
if (node == null) {
log.debug("Give up sending metadata request since no node is available");
// mark the timestamp for no node available to connect
this.lastNoNodeAvailableMs = now;
return;
}
log.debug("Trying to send metadata request to node {}", node.id());
if (connectionStates.isConnected(node.id()) && inFlightRequests.canSendMore(node.id())) {
Set<String> topics = metadata.topics();
this.metadataFetchInProgress = true;
//生成metadata请求,加入到sends队列中
ClientRequest metadataRequest = metadataRequest(now, node.id(), topics);
log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());
sends.add(metadataRequest.request());
this.inFlightRequests.add(metadataRequest);
} else if (connectionStates.canConnect(node.id(), now)) {
// we don't have a connection to this node right now, make one
log.debug("Init connection to node {} for sending metadata request in the next iteration", node.id());
initiateConnect(node, now);
// If initiateConnect failed immediately, this node will be put into blackout and we
// should allow immediately retrying in case there is another candidate node. If it
// is still connecting, the worst case is that we end up setting a longer timeout
// on the next round and then wait for the response.
} else { // connected, but can't send more OR connecting
// In either case, we just need to wait for a network tevent to let us know the seleced
// connection might be usable again.
this.lastNoNodeAvailableMs = now;
}
}
选择一个请求最少,并且链接状态可用的host,作为获取metadata的host
这里
/**
* Choose the node with the fewest outstanding requests which is at least eligible for connection. This method will
* prefer a node with an existing connection, but will potentially choose a node for which we don't yet have a
* connection if all existing connections are in use. This method will never choose a node for which there is no
* existing connection and from which we have disconnected within the reconnect backoff period.
* @return The node with the fewest in-flight requests.
*/
public Node leastLoadedNode(long now) {
List<Node> nodes = this.metadata.fetch().nodes();
int inflight = Integer.MAX_VALUE;
Node found = null;
for (int i = 0; i < nodes.size(); i++) {
int idx = Utils.abs((this.nodeIndexOffset + i) % nodes.size());
Node node = nodes.get(idx);
int currInflight = this.inFlightRequests.inFlightRequestCount(node.id());
if (currInflight == 0 && this.connectionStates.isConnected(node.id())) {
// if we find an established connection with no in-flight requests we can stop right away
return node;
} else if (!this.connectionStates.isBlackedOut(node.id(), now) && currInflight < inflight) {
// otherwise if this is the best we have found so far, record that
inflight = currInflight;
found = node;
}
}
return found;
}