日期:2014-05-16 浏览次数:20978 次
Java NOI中selector可视为一个观察者,只要我们把要观察的SocketChannel告诉Selector(注册的方式),我们就可以做其余的事情,等到已告知Channel上有事情发生时,Selector会通知我们,传回一组SelectionKey,我们读取这些Key,就可以获得Channel上的数据了。
Client端的底层通信直接采用了阻塞式IO编程,Server是采用Java NIO机制进行RPC通信:
java NIO参考资料:
http://www.iteye.com/topic/834447
http://weixiaolu.iteye.com/blog/1479656
=========================================================================================================================
Server是一个abstract类,抽象之处在call方法中,RPC.Server是ipc.Server的实现类,RPC.Server的构造函数调用了ipc.Server类的构造函数的,Namenode在初始化时调用RPC.getServer方法初始化了RPC.Server:
public static Server getServer(final Object instance, final String bindAddress, final int port,
final int numHandlers,
final boolean verbose, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager)
throws IOException {
return new Server(instance, conf, bindAddress, port, numHandlers, verbose, secretManager);
}private static class Call {
private int id; // 请求id
private Writable param; // 请求的参数
private Connection connection; // 和Client一样,表示一个C/S间的连接
private long timestamp; // 时间戳
private ByteBuffer response; // server对此次请求的响应结果
...
}
public int readAndProcess() throws IOException, InterruptedException {
//先对connection进行版本校验,校验成功后读取Header头部信息(得到客户端所用的协议和客户端的标识user)
//,接着读取数据(Call.id和参数params,其中params),然后建立一个Call
while (true) {
/* Read at most one RPC. If the header is not read completely yet
* then iterate until we read first RPC or until there is no data left.
*/
int count = -1;
if (dataLengthBuffer.remaining() > 0) {
count = channelRead(channel, dataLengthBuffer);
if (count < 0 || dataLengthBuffer.remaining() > 0)
return count;
}
if (!versionRead) {//尚未版本验证
//Every connection is expected to send the header.
ByteBuffer versionBuffer = ByteBuffer.allocate(1);
count = channelRead(channel, versionBuffer);
if (count <= 0) {
return count;
}
int version = versionBuffer.get(0);
//要读取BufferByte前要先flip下
dataLengthBuffer.flip();//.flip();一定得有,如果没有,就是从最后开始读取的,当然读出来的都是byte=0时候的字符。
//通过buffer.flip();这个语句,就能把buffer的当前位置更改为buffer缓冲区的第一个位置
if (!HEADER.equals(dataLengthBuffer) || version != CURRENT_VERSION) {
//Warning is ok since this is not supposed to happen.
LOG.warn("Incorrect header or version mismatch from " +
hostAddress + ":" + remotePort +
" got version " + version +
" expected version " + CURRENT_VERSION);
return -1;
}
dataLengthBuffer.clear();//清除内容
versionRead = true;//验证版本了
continue;
}
if (data == null) {//分配新的data
dataLengthBuffer.flip();
dataLength = dataLengthBu