RPC基本概念
RPC(Remote Procedure Call)是远程过程调用的简称,是一种常用的分布式网络通信协议。RPC要解决的问题就是,在分布式服务框架中实现不同服务节点(不同JVM上)之间的通信。
作为一个牛B的分布式系统,Hadoop实现了自己的RPC通信协议。它是Hadoop中多个分布式子系统(HDFS、Mapreduce、Hbase等)公用的网络通信模块。下面就使用Hadoop自己的RPC框架完成一个简单的RPC实例。
RPC通信过程
Hadoop使用了一个通用的RPC机制。其核心思想是,定义一个接口,让服务器端和客户端共享。客户端会使用java.reflection中的代理模式(动态代理),产生RPC接口的一个实现类。
当客户端调用接口中的方法时,比如String say(String msg),代理类会执行如下操作:
1. 序列化参数值(在本例中是msg)
2. 连接RPC服务端
3. 告诉服务端用给定的参数值去执行方法(本例中,我们会告诉服务端执行方法say,用参数msg)。
服务端进行如下操作:
1. 反序列化参数
2. 用给定的参数值执行方法(本例中,它会反序列化msg并运行say(msg))
3. 序列化方法的返回值
4. 发送返回值到客户端
一个RPC实例
使用的Hadoop版本:2.6.0
定义协议接口:
RPC协议通常为一个Java接口,定义了服务端能对外提供的服务。
@ProtocolInfo(protocolName = "sayHello",protocolVersion=123L)
public interface SayHello {
//public static final long versionID=521L;
//该方法在服务器端被调用,然后把返回值发送到客户端
String say(String msg);
}
@ProtocolInfo(protocolName = "sayHello",protocolVersion=123L)
public interface SayHello {
//public static final long versionID=521L;
//该方法在服务器端被调用,然后把返回值发送到客户端
String say(String msg);
}
这里有个非常重要的问题:为了实现向下兼容,RPC接口必须要有一个协议版本。
指定RPC协议的协议版本有三种方式:
1. RPC接口继承VersionedProtocol接口,该接口中定义了getProtocolVersion()方法,用于返回协议版本。
2. 通过注解方法,@ProtocolInfo
3. 在RPC接口中定义常量versionID
通过后两种方式定义的协议版本号,可以通过RPC的静态方法getProtocolVersion(clazz)来获取协议版本。
实现RPC接口:
public class SayHelloImpl implements SayHello{
public String say(String msg) {
System.out.println("say: "+msg);//在服务器端输出
return msg;
}
}
public class SayHelloImpl implements SayHello{
public String say(String msg) {
System.out.println("say: "+msg);//在服务器端输出
return msg;
}
}
启动服务端:
public class MyServer {
public static void main(String[] args) throws Exception {
/**
* setInstance(instance) :指定服务器端要运行的RPC协议的实现类
* setProtocol(clazz) :服务器端使用的RPC接口的版本,要与客户端使用的版本相同
*/
final RPC.Server server = new RPC.Builder(new Configuration())
.setBindAddress("127.0.0.1").setPort(22222)
.setInstance(new SayHelloImpl())
.setProtocol(SayHello.class)
.build();
server.start();
}
}
public class MyServer {
public static void main(String[] args) throws Exception {
/**
* setInstance(instance) :指定服务器端要运行的RPC协议的实现类
* setProtocol(clazz) :服务器端使用的RPC接口的版本,要与客户端使用的版本相同
*/
final RPC.Server server = new RPC.Builder(new Configuration())
.setBindAddress("127.0.0.1").setPort(22222)
.setInstance(new SayHelloImpl())
.setProtocol(SayHello.class)
.build();
server.start();
}
}
客户端:
public class MyClient {
public static void main(String[] args) throws Exception {
SayHello ping = RPC.getProxy(SayHello.class,
RPC.getProtocolVersion(SayHello.class),
new InetSocketAddress("127.0.0.1", 22222), new Configuration());
System.out.println("Server say() return value: " + ping.say("hello,world"));
}
}
public class MyClient {
public static void main(String[] args) throws Exception {
SayHello ping = RPC.getProxy(SayHello.class,
RPC.getProtocolVersion(SayHello.class),
new InetSocketAddress("127.0.0.1", 22222), new Configuration());
System.out.println("Server say() return value: " + ping.say("hello,world"));
}
}
RPC的静态方法getProxy用于为客户端构造一个RPC接口的代理对象。
RPC.getProxy(protocol,clientVersion,addr,conf)
@param protocol RPC接口
@param clientVersion 客户端使用的RPC接口的协议版本.
@param addr 服务器端的地址
@param conf
@return a proxy instance 返回RPC接口的代理对象.
RPC.getProxy(protocol,clientVersion,addr,conf)
@param protocol RPC接口
@param clientVersion 客户端使用的RPC接口的协议版本.
@param addr 服务器端的地址
@param conf
@return a proxy instance 返回RPC接口的代理对象.
RPC通信的核心说白了,就是动态代理+Socket通信。
通过动态代理类,当客户端调用RPC接口的方法时,在代理方法中,把想调用的方法签名和参数通过Socket通信发送给服务端。
服务端知道要执行的方法和方法的参数值后,执行该方法,并把方法返回值通过Socket发送给客户端。
以上的过程对客户端是透明的,就如同执行本地方法一样,完成了对远程方法的调用。
如果还不理解,我推荐看看阿里大神自己编写的一个简单RPC框架:
Hadoop中的RPC通信
在Hadoop分布式框架中,主从节点之间通过RPC协议进行通信。
主节点(NameNode或JobTracker)从不会主动向从节点(DataNode或TaskTracker)发送任何信息,而是由从节点主动通过心跳机制,周期性的向主节点发送心跳数据包,主节点也只能通过心跳应答的方式为各个从节点分配任务。这里的心跳,它实际上就是一个RPC函数。通信过程和上面的实验是一样的,只是增加了更加负责的业务逻辑。