第05篇:手写JavaRPC框架之执行层思路

举报
西魏陶渊明 发表于 2022/09/25 00:29:47 2022/09/25
【摘要】 作者: 西魏陶渊明 博客: https://blog.springlearn.cn/ 天下代码一大抄, 抄来抄去有提高, 看你会抄不会抄! 一、思路分析 通过前四篇文章的一起...

作者: 西魏陶渊明

博客: https://blog.springlearn.cn/

天下代码一大抄, 抄来抄去有提高, 看你会抄不会抄!

一、思路分析

通过前四篇文章的一起 Coding, 我们已经完成了 30%的工作,即完成了一个通信层的搭建。在不依赖任何 web 容器的情况下,可以实现服务之间的通信工作。就像下面这样。

客户端&服务端通信

    /**
     * @author liuxin
     * 个人博客:https://java.springlearn.cn
     * 公众号:西魏陶渊明  {关注获取学习源码}
     * 2022/8/11 23:12
     */
    @Test
    @DisplayName("构建服务端【阻塞方式】")
    public void server() throws Exception {
       Mojito.server(RpcRequest.class, RpcResponse.class)
                // 业务层,读取请求对象,返回结果
                .businessHandler((channelContext, request) -> new RpcResponse())
                .create().start(666);
    }

    @Test
    @DisplayName("构建客户端【同步方式】")
    public void clientSync() throws Exception{
        Client<RpcRequest, RpcResponse> client = Mojito.client(RpcRequest.class, RpcResponse.class)
                .connect("127.0.0.1", 6666);
        System.out.println(client.send(new RpcRequest()));
    }

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

这只是完成了通信, 就好比,两台服务器之间建立了沟通管道,但是究竟怎么用这个管道呢? 如何将客户端的请求参数发送到服务端的服务器上执行结果,并返回给客户端呢? 这就是本篇文章要讨论的话题。

暂且我们把这一层叫做 RPC 执行层吧。按照老套路,在真正开始 Coding 之前,我们先梳理一下逻辑,画一个最基础的图。

  • 左边客户端一个方法, 有 4 个服务端的实现。
  • 右边某个服务端有一个具体的实现。

左边客户端的接口,通过代理的方式,将客户端的参数通过网络管道传输给某台服务端的本地进行执行。执行后获取结果,返回给客户端的调用方。而这些都是通过代理的方式实现的,所以开发者就好像调用本地方法一样。实现一次远程方法的调用。

以上是最最基础的远程调用的过程,但是如果就这的话就太基础了,下面我们会在这个基础上去做更多的事情。

1.1 客户端实现思路

RPC 服务,服务端会给客户端提供一个 API 包,这个包里面没有具体的实现,但是客户端能直接进行调用。学习 Java 的都知道接口是不能实例化的,但是为什么服务端给我们的 API,我们能直接调用呢? 当然是代理了。所以我们要学的第一个东西,就是学会代理。

  1. 使用代理实现接口的实例化。【代理是我们必须要掌握的】

但是代理层里面怎么做呢?

  1. 代理层将要调用的远程类和当前的客户端的参数,进行封装,然后通过通信层发送给服务端进行直接,然后拿到结果返回。这里面可能就设计到异步转同步的问题,但是没关系我们通信层直接就提供了实现。

以上就是客户端最基础的功能了。但是我们不满足于此, 市面上 RPC 框架有的功能,我们也要有。比如那些呢?
负载均衡、容错策略、事件广播我们也要有。那么他们究竟怎么做的呢?

  1. 负载均衡,我们看上面客户端的图,会发现这个服务有 4 个实现 [172.168.10.1,172.168.10.2,172.168.10.3,172.168.10.4],说明服务端可能是集群部署的。那么既然有 4 个实例,我们就不能尽管这,一个 ip 进行进行调用,那么如何尽量让每个实例都能收到请求呢? 这就是负载均衡。

  2. 容错策略,当我们调用一台实例出错了,直接报错? 还是重试一下请求?或者是换一个实例在请求呢? 这就是容错策略。甚至我们还可以实现一个熔断器。

这里不会设计也没关系,我们会抄,哦,不对是借鉴。以下是dubbo的容错策略。

  1. 既然客户端有这个服务的所有实例信息,那么是不是不仅可以进行点对点的请求,还是能进行广播。

这五点就是客户端要具备的基础能力。

1.2 服务端实现思路

服务端我们就可以参考 Web 容器的思路,比如 Servle 如何处理请求呢? http 过来就是一个 url,如何匹配到要执行的方法的呢?

  1. 服务启动将 API 实现保存到 Map 接口中。
  2. 服务收到请求,Dispatch 根据请求信息从 map 中获取执行器。
  3. 通过反射获取执行结果,返回给调用方。

同时将自己的信息注册到注册中心,让客户端能注册中心发现自己。

二、技术储备

要实现上面的想法,究竟需要哪些技术储备呢?

2.1 代理模式

上面我们所说的负载均衡,容错策略,广播都要在代理中实现。

2.1.1 JDK 代理

JDK 代理必须要实现接口,第二个参数是接口数组,第三个参数是InvocationHandler。属于比较基础的内容。

public class ProxyFactory {

    public interface User {
        String queryName();
    }

    public static void main(String[] args) {
        User user = (User) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[]{User.class}, new InvocationHandler() {
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                if (method.getName().equals("queryName")) {
                    return "Jay";
                } else {
                    return null;
                }
            }
        });
        System.out.println(user.queryName());
    }
}

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

2.1.2 Cglib 代理

Cglib 代理,代理类不用有接口。

    public static class Person {

        public String queryName() {
            return "jay";
        }
    }

    public static void main(String[] args) {
        Person person = (Person) Enhancer.create(Person.class, new MethodInterceptor() {
            @Override
            public Object intercept(Object o, Method method, Object[] objects, MethodProxy methodProxy) throws Throwable {
                if (method.getName().equals("queryName")) {
                    return "代理:Jay";
                } else {
                    return null;
                }
            }
        });
        System.out.println(person.queryName());
    }

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

2.2 负载均衡策略

2.2.1 随机算法

public static String random(List<String> services) {
        Random random = new Random();
        String[] addressArr = services.toArray(new String[0]);
        // random
        return addressArr[random.nextInt(services.size())];
 }

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

2.2.2 轮训算法

public class RoundBalanceTest {

    public static void main(String[] args) {
        List<String> services = Arrays.asList("service1", "service2", "service3");
        XxlBalanceTest.manyRoute(i -> {
            // 请求次数,取模。serviceKey 可以更细粒度的控制轮训
            ColorConsole.colorPrintln("轮训负载({}):{}", i, round(services));
        });
    }

    private static final AtomicInteger atomicInteger = new AtomicInteger();

    private static String round(List<String> services) {
        int count = atomicInteger.get();
        if (count >= Integer.MAX_VALUE) {
            atomicInteger.set(0);
        }
        atomicInteger.incrementAndGet();
        String[] toArray = services.toArray(new String[0]);
        return toArray[count % toArray.length];
    }
}


  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

2.2.3 加权算法

加权算法的有很多的变异算法, 可以通过配置的方式,也可以通过某种策略动态的给每台服务器进行加权,从而来提高被轮训到的次数。 这里说两种网上常见的实现。

简单加权算法
一个简单暴力的加权算法,如下图。按照权重,重新构建集合。然后再将集合进行取模轮训即可。即可实现一个最简单 的加权算法。

代码实现也是比较简单的,如下代码。

public class WeightBalanceTest {

    private static class Server {

        private String host;

        private Integer port;

        public Server(String host, Integer port) {
            this.host = host;
            this.port = port;
        }

        @Override
        public String toString() {
            return "Server{" +
                    "host='" + host + '\'' +
                    ", port=" + port +
                    '}';
        }
    }

    private static final AtomicInteger atomicInteger = new AtomicInteger();

    public static Server round(List<Server> services) {
        int count = atomicInteger.get();
        if (count >= Integer.MAX_VALUE) {
            atomicInteger.set(0);
        }
        atomicInteger.incrementAndGet();
        Server[] toArray = services.toArray(new Server[0]);
        return toArray[count % toArray.length];
    }

    public static void main(String[] args) {
        Map<Server, Integer> confWeight = new HashMap<>();
        confWeight.put(new Server("127.0.0.1", 80), 2);
        confWeight.put(new Server("127.0.0.1", 81), 3);
        confWeight.put(new Server("127.0.0.1", 82), 5);

        List<Server> servers = new ArrayList<>();
        for (Map.Entry<Server, Integer> entity : confWeight.entrySet()) {
            Server server = entity.getKey();
            Integer weight = entity.getValue();
            for (int i = 0; i < weight; i++) {
                servers.add(server);
            }
        }
        Loops.loop(10, i -> {
            ColorConsole.colorPrintln("第{}次,权重轮训{}", i, round(servers));
        });
    }

}0,权重轮训Server{host='127.0.0.1', port=80}1,权重轮训Server{host='127.0.0.1', port=80}2,权重轮训Server{host='127.0.0.1', port=82}3,权重轮训Server{host='127.0.0.1', port=82}4,权重轮训Server{host='127.0.0.1', port=82}5,权重轮训Server{host='127.0.0.1', port=82}6,权重轮训Server{host='127.0.0.1', port=82}7,权重轮训Server{host='127.0.0.1', port=81}8,权重轮训Server{host='127.0.0.1', port=81}9,权重轮训Server{host='127.0.0.1', port=81}


  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65

但这样还是不均匀的, 相同的 ip 可能被连续的访问到其实就没有做到负载均衡。

平滑加权算法

主要解决上面那种不平滑的方案。这种方案是由 nginx (opens new window)提出来的。 算法的数学原理。

  • 最大权重,减总权重
  • 当前权重加上原权重
    如下权重变化。
轮数 选择前的当前权重 选择节点 选择后的当前权重
1 {5, 1, 1} a {-2, 1, 1}
2 {3, 2, 2} a {-4, 2, 2}
3 {1, 3, 3} b {1, -4, 3}
4 {6, -3, 4} a {-1, -3, 4}
5 {4, -2, 5} c {4, -2, -2}
6 {9, -1, -1} a {2, -1, -1}
7 {7, 0, 0} a {0, 0, 0}

下面我们通过代码来实现。

  • 首先我们定义出服务器模型, weight 是初始配置的权重,currentWeight 是计算后的权重。
  • 初始值 weight = currentWeight
    @Data
    @AllArgsConstructor
    @ToString
    @EqualsAndHashCode
    private static class Server {
        private String host;
        private Integer port;
        // 初始化权重
        private Integer weight;
        // 计算后的当前权重
        private Integer currentWeight;
    }

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

然后我们根据算法的核心点来选择节点。这里我们先不考虑性能只说思路,有了思路在自己来优化代码。

  1. line(3-6) 首先获取总权重
  2. line(8-14) 然后获取当前最大权重的节点
  3. line(16-21) 重新计算权重(主要使用算法的思想)
    • 当前最大权重节点,重新计算权重。当前权重 = 当前权重 - 总权重 + 原始权重
    • 其他节点,重新计算权重。当前权重 = 当前权重 + 原始权重
    public static Server selectServer(List<Server> servers) {
        // 获取总权重
        Integer totalWeight = 0;
        for (Server server : servers) {
            totalWeight += server.getWeight();
        }
        // 根据权重从小到大排序
        List<Server> sortByCurrentWeight = servers.stream().sorted(Comparator.comparing(Server::getCurrentWeight))
                .collect(Collectors.toList());
        // 集合反转,从大到小排序
        Collections.reverse(sortByCurrentWeight);
        // 当前最大权重
        Server maxWeightServer = sortByCurrentWeight.get(0);
        // 重新计算权重
        for (Server server : servers) {
            if (server.equals(maxWeightServer)) {
                server.setCurrentWeight(server.getCurrentWeight() - totalWeight);
            }
            server.setCurrentWeight(server.getCurrentWeight() + server.getWeight());
        }
        return maxWeightServer;
    }

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

可以看到非常的平滑均匀,每个 ip 都会被分散。

第0次,平滑权重轮训WeightBalanceTest2.Server(host=127,0,0,1, port=8080, weight=4, currentWeight=1)
第1次,平滑权重轮训WeightBalanceTest2.Server(host=127,0,0,1, port=8081, weight=2, currentWeight=-1)
第2次,平滑权重轮训WeightBalanceTest2.Server(host=127,0,0,1, port=8080, weight=4, currentWeight=2)
第3次,平滑权重轮训WeightBalanceTest2.Server(host=127,0,0,1, port=8082, weight=1, currentWeight=-2)
第4次,平滑权重轮训WeightBalanceTest2.Server(host=127,0,0,1, port=8080, weight=4, currentWeight=3)
第5次,平滑权重轮训WeightBalanceTest2.Server(host=127,0,0,1, port=8081, weight=2, currentWeight=0)
第6次,平滑权重轮训WeightBalanceTest2.Server(host=127,0,0,1, port=8080, weight=4, currentWeight=4)

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

很难想象,手写一个 RPC 框架,执行层所需要的技术储备只有这些而已。可以看出来,重要的是思路,思路决定出路。好了,下面我们就来实现这些思路吧。

那么你准备好跟我一起 Coding 了吗?

文章来源: springlearn.blog.csdn.net,作者:西魏陶渊明,版权归原作者所有,如需转载,请联系作者。

原文链接:springlearn.blog.csdn.net/article/details/126495375

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。