日期:2014-05-16  浏览次数:20771 次

Hadoop异步rpc通信机制--org.apache.hadoop.ipc.Server
Java NOI非阻塞技术不是开启线程去等待端口的响应,而是采用Reactor模式或Observer模式监听I/O端口,当端口有响应时,会自动通知我们,从而实现流畅的I/O读写。

Java NOI中selector可视为一个观察者,只要我们把要观察的SocketChannel告诉Selector(注册的方式),我们就可以做其余的事情,等到已告知Channel上有事情发生时,Selector会通知我们,传回一组SelectionKey,我们读取这些Key,就可以获得Channel上的数据了。

Client端的底层通信直接采用了阻塞式IO编程,Server是采用Java NIO机制进行RPC通信:

java NIO参考资料:

http://www.iteye.com/topic/834447

http://weixiaolu.iteye.com/blog/1479656

=========================================================================================================================

Server是一个abstract类,抽象之处在call方法中,RPC.Server是ipc.Server的实现类,RPC.Server的构造函数调用了ipc.Server类的构造函数的,Namenode在初始化时调用RPC.getServer方法初始化了RPC.Server:

public static Server getServer(final Object instance, final String bindAddress, final int port,
                                 final int numHandlers,
                                 final boolean verbose, Configuration conf,
                                 SecretManager<? extends TokenIdentifier> secretManager) 
    throws IOException {
    return new Server(instance, conf, bindAddress, port, numHandlers, verbose, secretManager);
  }

Server.Call是一个请求类,类似Client.Call,只是添加了Call的时间戳机制:

private static class Call {
    private int id;                       // 请求id
    private Writable param;               // 请求的参数
    private Connection connection;        // 和Client一样,表示一个C/S间的连接
    private long timestamp;               // 时间戳
    private ByteBuffer response;          // server对此次请求的响应结果
...
}

知道了Client.Connection后,显然Server.Connection就是Server到Client的连接。Server.Connection内保存了Client的地址,用于灾难恢复。Server.Connection通过调用readAndProcess对Client进行一些操作:版本校验,读数据头processHeader(获取通信协议protocol,根据头部的ugi信息创建user对象)以及读数据processData(获取Client发送过来的Call.id和params,根据二者建立一个请求call,并将请求call入队callQueue)【readAndProcess方法是在Listener.doRead时调用,此时监听器监听到新连接的读数据事件】。

    public int readAndProcess() throws IOException, InterruptedException {
    	//先对connection进行版本校验,校验成功后读取Header头部信息(得到客户端所用的协议和客户端的标识user)
    	//,接着读取数据(Call.id和参数params,其中params),然后建立一个Call
      while (true) {
        /* Read at most one RPC. If the header is not read completely yet
         * then iterate until we read first RPC or until there is no data left.
         */    
        int count = -1;
        if (dataLengthBuffer.remaining() > 0) {
          count = channelRead(channel, dataLengthBuffer);       
          if (count < 0 || dataLengthBuffer.remaining() > 0) 
            return count;
        }
      
        if (!versionRead) {//尚未版本验证
          //Every connection is expected to send the header.
          ByteBuffer versionBuffer = ByteBuffer.allocate(1);
          count = channelRead(channel, versionBuffer);
          if (count <= 0) {
            return count;
          }
          int version = versionBuffer.get(0);
          //要读取BufferByte前要先flip下
          dataLengthBuffer.flip();//.flip();一定得有,如果没有,就是从最后开始读取的,当然读出来的都是byte=0时候的字符。
          //通过buffer.flip();这个语句,就能把buffer的当前位置更改为buffer缓冲区的第一个位置          
          if (!HEADER.equals(dataLengthBuffer) || version != CURRENT_VERSION) {
            //Warning is ok since this is not supposed to happen.
            LOG.warn("Incorrect header or version mismatch from " + 
                     hostAddress + ":" + remotePort +
                     " got version " + version + 
                     " expected version " + CURRENT_VERSION);
            return -1;
          }
          dataLengthBuffer.clear();//清除内容
          versionRead = true;//验证版本了
          continue;
        }
        
        if (data == null) {//分配新的data
          dataLengthBuffer.flip();
          dataLength = dataLengthBu