如果大家对 RPC 有一些了解的话,或多或者都会听到过一些大名鼎鼎的 RPC 框架,比如 Dubbo、gRPC。但是大部分人对于他们底层的实现原理其实不甚了解。
成都创新互联专注为客户提供全方位的互联网综合服务,包含不限于成都网站制作、做网站、外贸营销网站建设、河源网络推广、小程序设计、河源网络营销、河源企业策划、河源品牌公关、搜索引擎seo、人物专访、企业宣传片、企业代运营等,从售前售中售后,我们都将竭诚为您服务,您的肯定,是我们最大的嘉奖;成都创新互联为所有大学生创业者提供河源建站搭建服务,24小时服务热线:18980820575,官方网址:www.cdcxhl.com
图片来自 Pexels
有一种比较好的学习方式就是:如果你想要了解一个框架的原理,你可以尝试去写一个简易版的框架出来,就比如如果你想理解 Spring IOC 的思想,最好的方式就是自己实现一个小型的 IOC 容器,自己慢慢体会。
所以本文尝试带领大家去设计一个小型的 RPC 框架,同时对于框架会保持一些拓展点。
通过阅读本文,你可以收获:
本文会依赖一些组件,他们是实现 RPC 框架必要的一些知识,文中会尽量降低这些知识带来的障碍。
但是,最好期望读者有以下知识基础:
RPC 框架应该长什么样子
我们首先来看一下:一个 RPC 框架是什么东西?我们最直观的感觉就是:
集成了 RPC 框架之后,通过配置一个注册中心的地址。一个应用(称为服务提供者)将某个接口(interface)“暴露”出去,另外一个应用(称为服务消费者)通过“引用”这个接口(interface),然后调用了一下,就很神奇的可以调用到另外一个应用的方法了
给我们的感觉就好像调用了一个本地方法一样。即便两个应用不是在同一个 JVM 中甚至两个应用都不在同一台机器中。
那他们是如何做到的呢?当我们的服务消费者调用某个 RPC 接口的方法之后,它的底层会通过动态代理,然后经过网络调用,去到服务提供者的机器上,然后执行对应的方法。
接着方法的结果通过网络传输返回到服务消费者那里,然后就可以拿到结果了。
整个过程如下图:
那么这个时候,可能有人会问了:服务消费者怎么知道服务提供者在哪台机器的哪个端口呢?
这个时候,就需要“注册中心”登场了,具体来说是这样子的:
这样一来,服务消费者就有了一份服务提供者所在的机器列表了。
“服务消费者”拿到了“服务提供者”的机器列表就可以通过网络请求来发起请求了。
网络客户端,我们应该采用什么呢?有几种选择:
“服务消费者”拿到了“服务提供者”的机器列表就可以通过网络请求来发起请求了。
作为一个有追求的程序员,我们要求开发出来的框架要求支持高并发、又要求简单、还要快。
当然是选择 Netty 来实现了,使用 Netty 的一些很基本的 API 就能满足我们的需求。
网络协议定义
当然了,既然我们要使用网络传输数据。我们首先要定义一套网络协议出来。
你可能又要问了,啥叫网络协议?网络协议,通俗理解,意思就是说我们的客户端发送的数据应该长什么样子,服务端可以去解析出来知道要做什么事情。
话不多说,上代码,假设我们现在服务提供者有两个类:
- // com.study.rpc.test.producer.HelloService
- public interface HelloService {
- String sayHello(TestBean testBean);
- }
- // com.study.rpc.test.producer.TestBean
- public class TestBean {
- private String name;
- private Integer age;
- public TestBean(String name, Integer age) {
- this.name = name;
- this.age = age;
- }
- // getter setter
- }
现在我要调用 HelloService.sayHello(TestBean testBean) 这个方法。
作为“服务消费者”,应该怎么定义我们的请求,从而让服务端知道我是要调用这个方法呢?
这需要我们将这个接口信息产生一个唯一的标识:这个标识会记录了接口名、具体是那个方法、然后具体参数是什么!
然后将这些信息组织起来发送给服务端,我这里的方式是将信息保存为一个 JSON 格式的字符串来传输。
比如上面的接口我们传输的数据大概是这样的:
- {
- "interfaces": "interface=com.study.rpc.test.producer.HelloService&method=sayHello&
- parameter=com.study.rpc.test.producer.TestBean",
- "requestId": "3",
- "parameter": {
- "com.study.rpc.test.producer.TestBean": {
- "age": 20,
- "name": "张三"
- }
- }
- }
嗯,我这里用一个 JSON 来标识这次调用是调用哪个接口的哪个方法,其中 interface 标识了唯一的类,parameter 标识了里面具体有哪些参数, 其中 key 就是参数的类全限定名,value 就是这个类的 JSON 信息。
可能看到这里,大家可能有意见了:数据不一定用 JSON 格式传输啊,而且使用 JSON 也不一定性能最高啊。
你使用 JDK 的 Serializable 配合 Netty 的 ObjectDecoder 来实现,这当然也可以,其实这里是一个拓展点,我们应该要提供多种序列化方式来供用户选择。
但是这里选择了 JSON 的原因是因为它比较直观,对于写文章来说比较合理。
开发服务提供者
嗯,搞定了网络协议之后,我们开始开发“服务提供者”了。对于服务提供者,因为我们这里是写一个简单版本的 RPC 框架,为了保持简洁。
我们不会引入类似 Spring 之类的容器框架,所以我们需要定义一个服务提供者的配置类,它用于定义这个服务提供者是什么接口,然后它具体的实例对象是什么:
- public class ServiceConfig{
- public Class type;
- public T instance;
- public ServiceConfig(Classtype, T instance) {
- this.type = type;
- this.instance = instance;
- }
- public ClassgetType() {
- return type;
- }
- public void setType(Classtype) {
- this.type = type;
- }
- public T getInstance() {
- return instance;
- }
- public void setInstance(T instance) {
- this.instance = instance;
- }
- }
有了这个东西之后,我们就知道需要暴露哪些接口了。为了框架有一个统一的入口,我定义了一个类叫做 ApplicationContext,可以认为这是一个应用程序上下文,他的构造函数,接收 2 个参数。
代码如下:
- public ApplicationContext(String registryUrl, ListserviceConfigs){
- // 1. 保存需要暴露的接口配置
- this.serviceConfigs = serviceConfigs == null ? new ArrayList<>() : serviceConfigs;
- // step 2: 实例化注册中心
- initRegistry(registryUrl);
- // step 3: 将接口注册到注册中心,从注册中心获取接口,初始化服务接口列表
- RegistryInfo registryInfo = null;
- InetAddress addr = InetAddress.getLocalHost();
- String hostname = addr.getHostName();
- String hostAddress = addr.getHostAddress();
- registryInfo = new RegistryInfo(hostname, hostAddress, port);
- doRegistry(registryInfo);
- // step 4:初始化Netty服务器,接受到请求,直接打到服务提供者的service方法中
- if (!this.serviceConfigs.isEmpty()) {
- // 需要暴露接口才暴露
- nettyServer = new NettyServer(this.serviceConfigs, interfaceMethods);
- nettyServer.init(port);
- }
- }
注册中心设计
这里分为几个步骤,首先保存了接口配置,接着初始化注册中心,因为注册中心可能会提供多种来供用户选择,所以这里需要定义一个注册中心的接口:
- public interface Registry {
- /**
- * 将生产者接口注册到注册中心
- *
- * @param clazz 类
- * @param registryInfo 本机的注册信息
- */
- void register(Class clazz, RegistryInfo registryInfo) throws Exception;
- }
这里我们提供一个注册的方法,这个方法的语义是将 clazz 对应的接口注册到注册中心。
接收两个参数,一个是接口的 class 对象,另一个是注册信息,里面包含了本机的一些基本信息,如下:
- public class RegistryInfo {
- private String hostname;
- private String ip;
- private Integer port;
- public RegistryInfo(String hostname, String ip, Integer port) {
- this.hostname = hostname;
- this.ip = ip;
- this.port = port;
- }
- // getter setter
- }
好了,定义好注册中心,回到之前的实例化注册中心的地方,代码如下:
- /**
- * 注册中心
- */
- private Registry registry;
- private void initRegistry(String registryUrl) {
- if (registryUrl.startsWith("zookeeper://")) {
- registryUrl = registryUrl.substring(12);
- registry = new ZookeeperRegistry(registryUrl);
- } else if (registryUrl.startsWith("multicast://")) {
- registry = new MulticastRegistry(registryUrl);
- }
- }
这里逻辑也非常简单,就是根据 url 的 schema 来判断是那个注册中心,注册中心这里实现了 2 个实现类,分别使用 Zookeeper 作为注册中心,另外一个是使用广播的方式作为注册中心。
广播注册中心这里仅仅是做个示范,内部没有实现。我们主要是实现了 Zookeeper 的注册中心。
当然了,如果有兴趣,可以实现更多的注册中心供用户选择,比如 Redis 之类的,这里只是为了保持“拓展点”。
那么实例化完注册中心之后,回到上面的代码。
注册服务提供者
- // step 3: 将接口注册到注册中心,从注册中心获取接口,初始化服务接口列表
- RegistryInfo registryInfo = null;
- InetAddress addr = InetAddress.getLocalHost();
- String hostname = addr.getHostName();
- String hostAddress = addr.getHostAddress();
- registryInfo = new RegistryInfo(hostname, hostAddress, port);
- doRegistry(registryInfo);
这里逻辑很简单,就是获取本机的的基本信息构造成 RegistryInfo,然后调用了 doRegistry 方法:
- /**
- * 接口方法对应method对象
- */
- private MapinterfaceMethods = new ConcurrentHashMap<>();
- private void doRegistry(RegistryInfo registryInfo) throws Exception {
- for (ServiceConfig config : serviceConfigs) {
- Class type = config.getType();
- registry.register(type, registryInfo);
- Method[] declaredMethods = type.getDeclaredMethods();
- for (Method method : declaredMethods) {
- String identify = InvokeUtils.buildInterfaceMethodIdentify(type, method);
- interfaceMethods.put(identify, method);
- }
- }
- }
这里做了两件事情:
下面分别分析这两件事情,首先是注册方法:因为我们用到了 Zookeeper,为了方便,引入了 Zookeeper 的客户端框架 Curator。
org.apache.curatorgroupId> curator-recipesartifactId> 2.3.0version> - dependency>
接着看代码:
- public class ZookeeperRegistry implements Registry {
- private CuratorFramework client;
- public ZookeeperRegistry(String connectString) {
- RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
- client = CuratorFrameworkFactory.newClient(connectString, retryPolicy);
- client.start();
- try {
- Stat myRPC = client.checkExists().forPath("/myRPC");
- if (myRPC == null) {
- client.create()
- .creatingParentsIfNeeded()
- .forPath("/myRPC");
- }
- System.out.println("Zookeeper Client初始化完毕......");
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- @Override
- public void register(Class clazz, RegistryInfo registryInfo) throws Exception {
- // 1. 注册的时候,先从zk中获取数据
- // 2. 将自己的服务器地址加入注册中心中
- // 为每一个接口的每一个方法注册一个临时节点,然后key为接口方法的唯一标识,data为服务地址列表
- Method[] declaredMethods = clazz.getDeclaredMethods();
- for (Method method : declaredMethods) {
- String key = InvokeUtils.buildInterfaceMethodIdentify(clazz, method);
- String path = "/myRPC/" + key;
- Stat stat = client.checkExists().forPath(path);
- ListregistryInfos;
- if (stat != null) {
- // 如果这个接口已经有人注册过了,把数据拿回来,然后将自己的信息保存进去
- byte[] bytes = client.getData().forPath(path);
- String data = new String(bytes, StandardCharsets.UTF_8);
- registryInfos = JSONArray.parseArray(data, RegistryInfo.class);
- if (registryInfos.contains(registryInfo)) {
- // 正常来说,zk的临时节点,断开连接后,直接就没了,但是重启会经常发现存在节点,所以有了这样的代码
- System.out.println("地址列表已经包含本机【" + key + "】,不注册了");
- } else {
- registryInfos.add(registryInfo);
- client.setData().forPath(path, JSONArray.toJSONString(registryInfos).getBytes());
- System.out.println("注册到注册中心,路径为:【" + path + "】 信息为:" + registryInfo);
- }
- } else {
- registryInfos = new ArrayList<>();
- registryInfos.add(registryInfo);
- client.create()
- .creatingParentsIfNeeded()
- // 临时节点,断开连接就关闭
- .withMode(CreateMode.EPHEMERAL)
- .forPath(path, JSONArray.toJSONString(registryInfos).getBytes());
- System.out.println("注册到注册中心,路径为:【" + path + "】 信息为:" + registryInfo);
- }
- }
- }
- }
Zookeeper 注册中心在初始化的时候,会建立好连接。然后注册的时候,针对 clazz 接口的每一个方法,都会生成一个唯一标识。
这里使用了InvokeUtils.buildInterfaceMethodIdentify方法:
- public static String buildInterfaceMethodIdentify(Class clazz, Method method) {
- Map
map = new LinkedHashMap<>(); - map.put("interface", clazz.getName());
- map.put("method", method.getName());
- Parameter[] parameters = method.getParameters();
- if (parameters.length > 0) {
- StringBuilder param = new StringBuilder();
- for (int i = 0; i < parameters.length; i++) {
- Parameter p = parameters[i];
- param.append(p.getType().getName());
- if (i < parameters.length - 1) {
- param.append(",");
- }
- }
- map.put("parameter", param.toString());
- }
- return map2String(map);
- }
- public static String map2String(Map
map) { - StringBuilder sb = new StringBuilder();
- Iterator
String>> iterator = map.entrySet().iterator(); - while (iterator.hasNext()) {
- Map.Entry
entry = iterator.next(); - sb.append(entry.getKey() + "=" + entry.getValue());
- if (iterator.hasNext()) {
- sb.append("&");
- }
- }
- return sb.toString();
- }
其实就是对接口的方法使用他们的限定名和参数来组成一个唯一的标识,比如 HelloService#sayHello(TestBean) 生成的大概是这样的:
- interface=com.study.rpc.test.producer.HelloService&method=sayHello&
- parameter=com.study.rpc.test.producer.TestBean
接下来的逻辑就简单了,在 Zookeeper 中的 /myRPC 路径下面建立临时节点,节点名称为我们上面的接口方法唯一标识,数据内容为机器信息。
之所以采用临时节点是因为:如果机器宕机了,连接断开之后,消费者可以通过 Zookeeper 的 watcher 机制感知到。
大概看起来是这样的:
- /myRPC/interface=com.study.rpc.test.producer.HelloService&method=sayHello&
- parameter=com.study.rpc.test.producer.TestBean
- [
- {
- "hostname":peer1,
- "port":8080
- },
- {
- "hostname":peer2,
- "port":8081
- }
- ]
通过这样的方式,在服务消费的时候就可以拿到这样的注册信息,然后知道可以调用那台机器的那个端口。
好了,注册中心弄完了之后,我们回到前面说的注册方法做的第二件事情,我们将每一个接口方法标识的方法放入了一个 map 中:
- /**
- * 接口方法对应method对象
- */
- private Map
interfaceMethods = new ConcurrentHashMap<>();
这个的原因是因为,我们在收到网络请求的时候,需要调用反射的方式调用 Method 对象,所以存起来。
启动网络服务端接受请求
接下来我们就可以看第四步了:
- // step 4:初始化Netty服务器,接受到请求,直接打到服务提供者的service方法中
- if (!this.serviceConfigs.isEmpty()) {
- // 需要暴露接口才暴露
- nettyServer = new NettyServer(this.serviceConfigs, interfaceMethods);
- nettyServer.init(port);
- }
因为这里使用 Netty 来做的所以需要引入 Netty 的依赖:
io.nettygroupId> netty-allartifactId> 4.1.30.Finalversion> - dependency>
接着来分析:
- public class NettyServer {
- /**
- * 负责调用方法的handler
- */
- private RpcInvokeHandler rpcInvokeHandler;
- public NettyServer(ListserverConfigs, MapinterfaceMethods)throws InterruptedException {
- this.rpcInvokeHandler = new RpcInvokeHandler(serverConfigs, interfaceMethods);
- }
- public int init(int port) throws Exception {
- EventLoopGroup bossGroup = new NioEventLoopGroup();
- EventLoopGroup workerGroup = new NioEventLoopGroup();
- ServerBootstrap b = new ServerBootstrap();
- b.group(bossGroup, workerGroup)
- .channel(NioServerSocketChannel.class)
- .option(ChannelOption.SO_BACKLOG, 1024)
- .childHandler(new ChannelInitializer(){
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ByteBuf delimiter = Unpooled.copiedBuffer("$$");
- // 设置按照分隔符“&&”来切分消息,单条消息限制为 1MB
- ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024 * 1024, delimiter));
- ch.pipeline().addLast(new StringDecoder());
- ch.pipeline().addLast().addLast(rpcInvokeHandler);
- }
- });
- ChannelFuture sync = b.bind(port).sync();
- System.out.println("启动NettyService,端口为:" + port);
- return port;
- }
- }
这部分主要的都是 Netty 的 API,我们不做过多的说明,就简单的说一下:
- public class RpcInvokeHandler extends ChannelInboundHandlerAdapter {
- /**
- * 接口方法唯一标识对应的Method对象
- */
- private Map
interfaceMethods; - /**
- * 接口对应的实现类
- */
- private Map
interfaceToInstance; - /**
- * 线程池,随意写的,不要吐槽
- */
- private ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10,
- 50, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100),
- new ThreadFactory() {
- AtomicInteger m = new AtomicInteger(0);
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, "IO-thread-" + m.incrementAndGet());
- }
- });
- public RpcInvokeHandler(ListserviceConfigList,
- Map
interfaceMethods) { - this.interfaceToInstance = new ConcurrentHashMap<>();
- this.interfaceMethods = interfaceMethods;
- for (ServiceConfig config : serviceConfigList) {
- interfaceToInstance.put(config.getType(), config.getInstance());
- }
- }
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- try {
- String message = (String) msg;
- // 这里拿到的是一串JSON数据,解析为Request对象,
- // 事实上这里解析网络数据,可以用序列化方式,定一个接口,可以实现JSON格式序列化,或者其他序列化
- // 但是demo版本就算了。
- System.out.println("接收到消息:" + msg);
- RpcRequest request = RpcRequest.parse(message, ctx);
- threadPoolExecutor.execute(new RpcInvokeTask(request));
- } finally {
- ReferenceCountUtil.release(msg);
- }
- }
- @Override
- public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
- ctx.flush();
- }
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- System.out.println("发生了异常..." + cause);
- cause.printStackTrace();
- ctx.close();
- }
- public class RpcInvokeTask implements Runnable {
- private RpcRequest rpcRequest;
- RpcInvokeTask(RpcRequest rpcRequest) {
- this.rpcRequest = rpcRequest;
- }
- @Override
- public void run() {
- try {
- /*
- * 数据大概是这样子的
- * {"interfaces":"interface=com.study.rpc.test.producer.HelloService&method=sayHello¶meter=com
- * .study.rpc.test.producer.TestBean","requestId":"3","parameter":{"com.study.rpc.test.producer
- * .TestBean":{"age":20,"name":"张三"}}}
- */
- // 这里希望能拿到每一个服务对象的每一个接口的特定声明
- String interfaceIdentity = rpcRequest.getInterfaceIdentity();
- Method method = interfaceMethods.get(interfaceIdentity);
- Map
map = string2Map(interfaceIdentity); - String interfaceName = map.get("interface");
- Class interfaceClass = Class.forName(interfaceName);
- Object o = interfaceToInstance.get(interfaceClass);
- String parameterString = map.get("parameter");
- Object result;
- if (parameterString != null) {
- String[] parameterTypeClass = parameterString.split(",");
- Map
parameterMap = rpcRequest.getParameterMap(); - Object[] parameterInstance = new Object[parameterTypeClass.length];
- for (int i = 0; i < parameterTypeClass.length; i++) {
- String parameterClazz = parameterTypeClass[i];
- parameterInstance[i] = parameterMap.get(parameterClazz);
- }
- result = method.invoke(o, parameterInstance);
- } else {
- result = method.invoke(o);
- }
- // 写回响应
- ChannelHandlerContext ctx = rpcRequest.getCtx();
- String requestId = rpcRequest.getRequestId();
- RpcResponse response = RpcResponse.create(JSONObject.toJSONString(result), interfaceIdentity,
- requestId);
- String s = JSONObject.toJSONString(response) + "$$";
- ByteBuf byteBuf = Unpooled.copiedBuffer(s.getBytes());
- ctx.writeAndFlush(byteBuf);
- System.out.println("响应给客户端:" + s);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- public static Map
string2Map(String str) { - String[] split = str.split("&");
- Map
map = new HashMap<>(16); < 分享文章:神烦,老大要我写一个RPC框架!
网页网址:http://www.36103.cn/qtweb/news45/9795.html网站建设、网络推广公司-创新互联,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等
声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联