Netty实战 -- 使用Netty实现分布式框架Dubbo RPC
📢📢📢📣📣📣
哈喽!大家好,我是【Bug 终结者】 ,【CSDN新星创作者】🏆,阿里云技术博主🏆,51CTO人气博主🏆,INfoQ写作专家🏆
一位上进心十足,拥有极强学习力的【Java领域博主】😜😜😜
🏅【Bug 终结者】博客的领域是【面向后端技术】的学习,未来会持续更新更多的【后端技术】以及【学习心得】。 偶尔会分享些前端基础知识,会更新实战项目,面向企业级开发应用!
🏅 如果有对【后端技术】、【前端领域】感兴趣的【小可爱】,欢迎关注【Bug 终结者】💞💞💞
❤️❤️❤️ 感谢各位大可爱小可爱! ❤️❤️❤️
@[TOC]
⚡Netty系列文章
⚡Dubbo系列文章
Spring Boot 整合Dubbo + Zookeeper 实现分布式 消费者与服务者的业务调用
一、什么是RPC?
RPC【Remote Procedure Call】是指远程过程调用,是一种进程间通信方式,他是一种技术的思想,而不是规范。它允许程序调用另一个地址空间(通常是共享网络的另一台机器上)的过程或函数,而不用程序员显式编码这个远程调用的细节。即程序员无论是调用本地的还是远程的函数,本质上编写的调用代码基本相同。
常见的RPC框架有: 比较知名的阿里Dubbo、Google的GRPC、Go的RPCX、Apache的Thrift,SpringCloud
1.1 RPC基本原理
两个或多个应用程序都分布在不同的服务器上,它们之间的调用都像是本地方法调用一样
示意图
RPC两个核心模块:序列化和通讯
1.2 RPC执行流程
在RPC中,Client叫做服务消费者,Server叫做服务提供者
RPC调用流程说明
- 服务消费方(client),以本地调用方式调用服务
- client stub 接收到调用后负责将方法、参数等封装成能够进行网络传输的消息体
- client stub 将消息进行编码并发送到服务器
- server stub 收到消息后进行解码
- server stub 根据解码结果调用本地API
- 本地服务 执行并返回结果给 server stub
- server stub 将返回结果导入进行编码并发送至消费方
- client stub 接收到数据进行解码
- 服务消费方得到结果
RPC的目标就是将2~8的步骤封装起来,用户无需关注这些细节,可以像调用本地方法一样即可完成远程服务调用
图解
二、什么是代理模式?
代理模式的定义:代理模式给某一个对象提供一个代理对象,并由代理对象控制对原对象的引用。通俗的来讲代理模式就是我们生活中常见的中介。
Java中代理模式分为静态代理和动态代理模式
2.1 案例实现
中午到了,小明很饿,于是在美团外卖上点了一份烤鸭,过了半个小时,外卖到了,小明下去拿外卖,顺利的吃上了烤鸭~
2.2 静态代理方式
由程序员手动创建代理类或工具对象,从而实现调用服务
Subject类
package com.wanshi.netty.dubborpc.netty;
public interface Subject {
String buy(String msg);
}
SubjectImpl类
package com.wanshi.netty.dubborpc.netty;
public class SubjectImpl implements Subject{
@Override
public String buy(String msg) {
return "买了" + msg;
}
}
ProxySubject类
package com.wanshi.netty.dubborpc.netty;
public class ProxySubject {
private Subject subject;
{
subject = new SubjectImpl();
}
public void buy(String msg) {
System.out.println("美团外卖,使命必达,跑腿代买!");
String buy = subject.buy(msg);
System.out.println(buy);
}
}
测试类
public static void main(String[] args) {
ProxySubject subject = new ProxySubject();
subject.buy("北京烤鸭");
}
效果
⛽静态代理的优缺点
缺点: 不利于扩展,调用一次就要创建一次对象,从而造成不必要的内存空间浪费
优点: 可读性好,逻辑简单,清晰明了
2.3 动态代理方式
动态代理又分为:JDK动态代理、CGLIB动态代理
在程序运行时,运用反射机制动态创建而成,达到调用服务
使用以上的Subject类和实现类
SubjectInvocationHandler处理器类
package com.wanshi.netty.dubborpc.netty;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
public class SubjectInvocationHandler implements InvocationHandler {
private Object obj;
public SubjectInvocationHandler(Object obj) {
this.obj = obj;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Object res = method.invoke(obj, args);
return res;
}
}
ProxyFactry工厂类
package com.wanshi.netty.dubborpc.netty;
import java.lang.reflect.Proxy;
public class ProxyFactry {
public static Subject getInstance() {
SubjectImpl subject = new SubjectImpl();
System.out.println("美团外卖,使命必达,跑腿代买!");
SubjectInvocationHandler subjectInvocationHandler = new SubjectInvocationHandler(subject);
Subject proxy = (Subject) Proxy.newProxyInstance(subject.getClass().getClassLoader(),
subject.getClass().getInterfaces(), subjectInvocationHandler);
return proxy;
}
}
测试类
public static void main(String[] args) {
Subject subject = ProxyFactry.getInstance();
String buy = subject.buy("饮料");
System.out.println(buy);
}
效果
⛽动态代理的优缺点
两种动态代理对照表
JDK原生动态代理 | CGLIB动态代理 | |
---|---|---|
核心原理 | 基于 ”接口实现“方式 | 基于类集成方式 |
优点 | Java原生支持的,不需要任何依赖 | 对与代理的目标对象无限制,无需实现接口 |
不足之处 | 只能基于接口实现 | 无法处理final方法 |
实现方式 | Java原生支持 | 需要引入Jar文件依赖 |
三、Netty实现DubboRPC
3.1 需求说明
- dubbo 底层使用了Netty作为网络通讯框架,要求用Netty实现一个简单的RPC框架
- 模仿dubbo,消费者和提供者约定接口和协议,消费者远程调用提供者的服务,提供者返回一个字符串,消费者打印提供者返回的数据,底层使用Netty框架
3.2 剖析需求
- 创建接口,定义抽象方法,用于服务消费者与服务提供者之间的约定
- 创建一个提供者,该类需要监听消费者的请求,并按照约定返回数据
- 创建一个消费者,该类需要 透明的调用自己不存在的方法,内部使用Netty 请求提供者返回数据
3.3 效果图
3.4 核心源码
♻️共用接口API
HelloService
package com.wanshi.netty.dubborpc.publicinterface;
/**
* 公共接口,提供服务
*/
public interface HelloService {
String hello(String msg);
}
♻️服务提供者
ServerBootstrap启动类
package com.wanshi.netty.dubborpc.provider;
import com.wanshi.netty.dubborpc.netty.NettyServer;
/**
* 服务提供者启动类,监听消费者,并绑定端口8888
*/
public class ServerBootstrap {
public static void main(String[] args) {
NettyServer.startServer("127.0.0.1", 8888);
}
}
NettyServer
package com.wanshi.netty.dubborpc.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
/**
* Netty服务器类,启动服务
*/
public class NettyServer {
/**
* 开启服务方法,调用内部私有启动服务方法,此类写法很常用,进一层的封装了API
* @param hostName
* @param port
*/
public static void startServer(String hostName, int port) {
startServer0(hostName, port);
}
/**
* 真正启动服务的方法
* @param hostName
* @param port
*/
private static void startServer0(String hostName, int port) {
//创建2个线程,一个为主线程仅创建1个,另外创建工作线程CPU核数*2个
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//服务器启动类
ServerBootstrap serverBootstrap = new ServerBootstrap();
//初始化参数
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(new NettyServerHandler());
}
});
System.out.println("服务端提供服务准备就绪...");
//绑定端口,启动服务,异步执行方法
ChannelFuture channelFuture = serverBootstrap.bind(hostName, port).sync();
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
//优雅的关闭线程
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
♻️服务消费者
ClientBootstrap
package com.wanshi.netty.dubborpc.consumer;
import com.wanshi.netty.dubborpc.netty.NettyClient;
import com.wanshi.netty.dubborpc.publicinterface.HelloService;
public class ClientBootstrap {
public static final String providerName = "hello#";
public static void main(String[] args) throws InterruptedException {
NettyClient client = new NettyClient();
HelloService service = (HelloService) client.getBean(HelloService.class, providerName);
for (;;) {
Thread.sleep(2000);
String hello = service.hello("你好鸭 dubbo~");
System.out.println("服务端返回的结果:" + hello + "\n\n");
}
}
}
NettyClient
package com.wanshi.netty.dubborpc.netty;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.lang.reflect.Proxy;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class NettyClient {
//创建线程池,大小为CPU核数*2
private ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
//将客户端处理器提升至全局变量
private NettyClientHandler clientHandler;
//记录调用服务的次数
private int count;
/**
* 代理对象,执行方法,这里用到了代理模式
* @param serviceClass
* @param providerName
* @return
*/
public Object getBean(final Class<?> serviceClass, String providerName) {
return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
new Class<?>[]{serviceClass}, (proxy, method, args) -> {
System.out.println("(proxy, method, args) 进入,第" + (++count) + "次调用远程服务");
if (clientHandler == null) {
initClient();
}
clientHandler.setParam(providerName + args[0]);
return executorService.submit(clientHandler).get();
});
}
/**
* 初始化客户端
*/
public void initClient() {
clientHandler = new NettyClientHandler();
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(clientHandler);
}
});
try {
bootstrap.connect("127.0.0.1", 8888).sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
四、源码下载
本教程已上传至GitHub代码托管平台,希望点个Star鸭~
⛵小结
以上就是【Bug 终结者】对基于Netty实现Dubbo RPC 简单的概述,手写Dubbo RPC,代码有点难度,但坚持的啃下来,多敲上几遍,熟能生巧,加油,相信你会对RPC有一个新的理解,加油,编程路上,你我都是追梦人~
如果这篇【文章】有帮助到你,希望可以给【Bug 终结者】点个赞👍,创作不易,如果有对【后端技术】、【前端领域】感兴趣的小可爱,也欢迎关注❤️❤️❤️ 【Bug 终结者】❤️❤️❤️,我将会给你带来巨大的【收获与惊喜】💝💝💝!
- 点赞
- 收藏
- 关注作者
评论(0)