而事实上,HDFS确实没有用RPC机制传输数据消息。当HDFS中的DFSClient对DataNode上保存的文件数据进行读写的时候,它其实采用了另外一个机制,简单介绍一下:
每个DataNode在启动的时候会创建一个线程DataXceiverServer来专门负责block数据的读写的链接。而DataXceiverServer做的事情很简单 --一旦有一个连接,就创建一个新的DataXceiver来处理这个连接:
public void run() {while (datanode.shouldRun) {try {Socket s = ss.accept();s.setTcpNoDelay(true);new Daemon(datanode.threadGroup, new DataXceiver(s, datanode, this)).start();} catch (SocketTimeoutException ignored) {// wake up to see if should continue to run} catch (IOException ie) {// ............} catch (Throwable te) {// ............}}try {ss.close();} catch (IOException ie) {// .......}}
DataXceiver也是一个线程,它负责处理对应的一个连接,主要完成4种任务:
opReadBlock: 读取一个block
opWriteBlock: 写一个block到disk上
opCopyBlock: 读一个block,然后送到指定的目的地
opReplaceBlock: 替换一个block
class DataXceiver extends DataTransferProtocol.Receiverimplements Runnable, FSConstants {// ................/*** Read/write data from/to the DataXceiveServer.*/public void run() {updateCurrentThreadName("Waiting for operation");DataInputStream in=null; try {in = new DataInputStream(new BufferedInputStream(NetUtils.getInputStream(s), SMALL_BUFFER_SIZE));final DataTransferProtocol.Op op = readOp(in);// Make sure the xciver count is not exceeded// ....processOp(op, in);} catch (Throwable t) {LOG.error(datanode.dnRegistration + ":DataXceiver",t);} finally {//.....}}/** Process op by the corresponding method. */protected final void processOp(Op op, DataInputStream in) throws IOException {switch(op) {case READ_BLOCK:opReadBlock(in);break;case WRITE_BLOCK:opWriteBlock(in);break;case REPLACE_BLOCK:opReplaceBlock(in);break;case COPY_BLOCK:opCopyBlock(in);break;case BLOCK_CHECKSUM:opBlockChecksum(in);break;default:throw new IOException("Unknown op " + op + " in data stream");}}
所以,当HDFS进行数据传输的时候,对于每一个链接创建一个thread进行处理,这样,如果两个Node之间的数据传输很频繁的话,那么有可能会创建多个链接,吞吐量就上去了。
如果熟悉network server architecture的话,很容易知道HDFS在这里采用的是one thread per request的模型。它没有采用当下流行的基于epoll的event-driven architecture,甚至于,它都没有采用thread pool的方式,而是使用了一种“很土很土”的模型。众所周知,one thread per request模型一个很明显的缺陷在于如果访问的并发数太高,可能产生大量的thread而导致thread间的context swith开销过大。
我个人想法,HDFS之所以采用这样的一个模型,一方面是编程上比较简单,另一个方面,可能是开发人员认为HDFS这样的一个系统并不容易出现高并发的访问。从图中看,需要和DataNode进行数据消息交互的模块有两个:一个是DFSClient,一个是其它的DataNode。
先说后者,DataNode与DataNode之间的数据消息交互只发生在一种情况下,就是某个DFSClient对block进行了写操作,那么被写的DataNode需要将这些数据复制到这个block副本所在的其它DataNode上。是一种链式结构:
DFSClient -->DataNode A -->DataNode B --> DataNode C
所以,DataNode之间的链接是与DFSClient和DataNode之间的链接一一对应的。
再说DFSClient,它并不同于Web Server所要面对的client。Web Serve的client是终端的浏览器,可能成千上万,这是不可控的。而DFSClient是系统内的client,它的数量不会太多(就好比是对Database的连接数,是开发人员控制的,所以不会太大)。由于DFSClient在系统内部的数量不会太多,所以DataNode从DFSClient过来的连接也就不会太多。既然DFSClient发起的连接不多,那么DataNode之间的连接也不会多。
结合以上二点,整个HDFS很难产生高并发的情况,所以采用one thread per request的架构也就说得过去了。