Kafka源码深度解析-序列3 -Producer -Java NIO
@Overridepublic void poll(long timeout) throws IOException {
。。。
clear(); //清空各种状态
if (hasStagedReceives())
timeout = 0;
long startSelect = time.nanoseconds();
int readyKeys = select(timeout);//轮询
long endSelect = time.nanoseconds();
currentTimeNanos = endSelect;
this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());
if (readyKeys > 0) { Set keys = this.nioSelector.selectedKeys();
Iterator iter = keys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
KafkaChannel channel = channel(key);
// register all per-connection metrics at once
sensors.maybeRegisterConnectionMetrics(channel.id());
lruConnections.put(channel.id(), currentTimeNanos);
try {
if (key.isConnectable()) {//有连接事件
channel.finishConnect();
this.connected.add(channel.id());
this.sensors.connectionCreated.record();
}
if (channel.isConnected() && !channel.ready())
channel.prepare(); //这个只有需要安全检查的SSL需求,普通的不加密的channel,prepare()为空实现
if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) { //读就绪
NetworkReceive networkReceive;
while ((networkReceive = channel.read()) != null)
addToStagedReceives(channel, networkReceive); //实际的读动作
}
if (channel.ready() && key.isWritable()) {//写就绪
Send send = channel.write(); //实际的写动作
if (send != null) {
this.completedSends.add(send);
this.sensors.recordBytesSent(channel.id(), send.size());
}
} /* cancel any defunct sockets */
if (!key.isValid()) {
close(channel);
this.disconnected.add(channel.id());
}
} catch (Exception e) {
String desc = channel.socketDescription();
if (e instanceof IOException)
log.debug("Connection with {} disconnected", desc, e);
else
log.warn("Unexpected error from {}; closing connection", desc, e);
close(channel);
this.disconnected.add(channel.id());
}
}
}
addToCompletedReceives();
long endIo = time.nanoseconds();
this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
maybeCloseOldestConnection();
}1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
页:
[1]