08、Netty学习笔记—(基于聊天业务:RPC实现)
@[toc]
netty笔记汇总:Netty学习指南(资料、文章汇总)
根据黑马程序员netty视频教程学习所做笔记,部分内容图例来源黑马笔记
笔记demo案例仓库地址: Github-【netty-learn】、Gitee-【netty-learn】
前言
源码地址:netty-learn,目录下的netty-聊天室
项目,该rpc调用实现是基于聊天室的协议及编解码器处理设定之上。
踩坑点
1、使用gson进行json序列化时,无法序列化反序列化 Class 类型,报错!
问题复现
System.out.println(new Gson().toJson(String.class));
解决方案
针对于Class类型进行自定义适配器:
class ClassCodec implements JsonSerializer<Class<?>>, JsonDeserializer<Class<?>> {
@Override
public Class<?> deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) throws JsonParseException {
try {
String str = json.getAsString();
return Class.forName(str);
} catch (ClassNotFoundException e) {
throw new JsonParseException(e);
}
}
@Override // String.class
public JsonElement serialize(Class<?> src, Type typeOfSrc, JsonSerializationContext context) {
// class -> json
return new JsonPrimitive(src.getName());
}
}
在gson实例化时将其添加入其中:
public enum SerializerAlgorithm implements Serializer{
Json{
@Override
public <T> T deserialize(Class<T> clazz, byte[] bytes) {
Gson gson = new GsonBuilder().registerTypeAdapter(Class.class, new SerializerAlgorithm.ClassCodec()).create();//注册!
String json = new String(bytes, StandardCharsets.UTF_8);
return gson.fromJson(json, clazz);
}
@Override
public <T> byte[] serialize(T object) {
Gson gson = new GsonBuilder().registerTypeAdapter(Class.class, new SerializerAlgorithm.ClassCodec()).create();
String json = gson.toJson(object);
return json.getBytes(StandardCharsets.UTF_8);
}
};
}
一、RPC实现细节简述(直击要点)
若是有网络通讯的需求,那么就可以使用netty。对于RPC框架、消息队列的底层就是使用的netty。
简化RPC框架实现部分技术点说明(netty应用)
1、请求、响应对象封装来进行RPC调用,最终返回结果、异常信息。
- ①处理半包、黏包问题(LTC解码器,参数根据自定义的协议);②实现编解码器(根据指定的协议进行编解码操作,MessageToMessageCodec<ByteBuf, ?>);③编写能够处理指定特定感兴趣的某个请求、响应对象handler进行单独处理(SimpleChannelInboundHandler<?>)
2、针对于所需要远程调用的方法想要像调用本地方法一样,需要实现一个代理类来取到指定要调用的类、方法、请求参数…封装成请求对象通过netty的channel(单例)发送给服务器!
- ①JDK动态代理;②单例模式(双重检测锁)获取channel。
3、客户端接收:
①调用方法(间接动态代理执行)时肯定是主线程来进行执行的,而响应接收到数据是Eventloop中的线程来使用handler进行处理。那么调用方法线程如何取到进行响应对象处理线程拿到的结果值呢?这里的话就涉及到线程通信了。
- 解:通过使用netty中的promise来进行处理线程通信问题(promise提供了阻塞等待,多个线程中取到容器值的api)。
②若是进行多次远程调用方法,如何使调用方法的线程来准确的取到其他线程处理响应请求中的值?
- 解:通过一个分布式id(使用一个并发计数器,确保唯一),对于promise保存可以使用一个map(key为分布式id,value为promise),请求对象中包含该id,服务器响应时,将id再次封装到响应对象里,接着客户端在接收响应时可以拿到id,由此再根据该id取到map中的指定primise对象(取promise时为空间最大化可以直接调用remove取)。
③远程调用的方法怎么才能知道何时得到响应?
- 解:使用promise的await()来阻塞等待promise取到结果,一旦在响应对象接收到时来对promise进行setSuccess或setFailure就可以调用线程阻塞结束并继续向下执行,通过isSuccess()…API来间接根据结果进行相应操作!
4、服务端异常调用处理情况:远程调用方法若是出现异常应该将完整的错误信息可以记录到日志中,而不是直接一股脑的发给客户端,传回客户端的应该是异常的简略信息,使用一个Exception来封装一下简略异常描述返回即可。
扩展:
- 企业里一般都是把API接口单独作为一个二方包,其他人要调用你的方法时,只需要引用你的二方包即可。
- 泛型使用通配符?只能取,不能存,这是语法上的要求(所以若是要存就设置Object),但是有个特例就是null可以放!
二、具体代码实现
Message
说明:某个业务的请求、响应对象继承该Message抽象类。
import com.changlu.message.rpc.RpcRequestMessage;
import com.changlu.message.rpc.RpcResponseMessage;
import lombok.Data;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
@Data
public abstract class Message implements Serializable {
public static Class<?> getMessageClass(int messageType) {
return messageClasses.get(messageType);
}
private int sequenceId;//序列号id(唯一)
private int messageType;
public abstract int getMessageType();
//...
public static final int RPC_MESSAGE_TYPE_REQUEST = 101;
public static final int RPC_MESSAGE_TYPE_RESPONSE = 102;
private static final Map<Integer, Class<?>> messageClasses = new HashMap<>();
static {
//...
messageClasses.put(RPC_MESSAGE_TYPE_REQUEST, RpcRequestMessage.class);
messageClasses.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class);
}
}
客户端
RpcRequestMessage
说明:进行远程调用时传输的请求对象封装。
import com.changlu.message.Message;
import lombok.Data;
import lombok.ToString;
/**
* @ClassName RpcRequestMessage
* @Author ChangLu
* @Date 2022/1/16 21:38
* @Description RPC消息请求对象
*/
@Data
@ToString(callSuper = true)
public class RpcRequestMessage extends Message {
/**
* 接口全限定名
*/
private String interfaceName;
/**
* 调用接口的方法名
*/
private String methodName;
/**
* 方法返回类型
*/
private Class<?> returnType;
/**
* 方法参数类型数组
*/
private Class[] parameterTypes;
/**
* 方法调用参数值
*/
private Object[] parameterValue;
public RpcRequestMessage(int sequenceId,String interfaceName, String methodName, Class<?> returnType, Class[] parameterTypes, Object[] parameterValue) {
super.setSequenceId(sequenceId);
this.interfaceName = interfaceName;
this.methodName = methodName;
this.returnType = returnType;
this.parameterTypes = parameterTypes;
this.parameterValue = parameterValue;
}
@Override
public int getMessageType() {
return RPC_MESSAGE_TYPE_REQUEST;
}
}
RpcResponseMessageHandler
说明:针对于服务器响应时封装的RpcResponseMessage消息处理handler。
import com.changlu.message.rpc.RpcResponseMessage;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.concurrent.Promise;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @ClassName RpcResponseMessageHandler
* @Author ChangLu
* @Date 2022/1/17 19:22
* @Description 针对于处理RpcResponseMessage的handler
*/
@Slf4j
@ChannelHandler.Sharable
public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {
public static Map<Integer, Promise> promiseMap = new ConcurrentHashMap<>();
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {
log.debug("{}", msg);
final Object returnValue = msg.getReturnValue();
final Exception ex = msg.getExceptionValue();
//根据id取出指定的promise
final Promise<Object> promise = promiseMap.remove(msg.getSequenceId());
if (ex == null){
promise.setSuccess(returnValue);
}else{
promise.setFailure(ex);
}
}
}
Client
说明:建立连接及初始化channel,JDK动态代理实现。
import com.changlu.message.rpc.RpcRequestMessage;
import com.changlu.protocol.MessageCodecSharable;
import com.changlu.protocol.ProcotolFrameDecoder;
import com.changlu.server.handler.rpc.RpcResponseMessageHandler;
import com.changlu.server.service.rpc.RpcService;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import lombok.extern.slf4j.Slf4j;
import java.lang.reflect.Proxy;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @ClassName RpcClient
* @Author ChangLu
* @Date 2022/1/16 21:45
* @Description RPC远程调用客户端
*/
@Slf4j
public class RpcClient {
private static volatile Channel channel = null;
private static Object lock = new Object();
//并发计数器
private static AtomicInteger seqId = new AtomicInteger(0);
private static <T> T getProxyService(Class<T> clazz) {
ClassLoader classLoader = clazz.getClassLoader();
Class[] classes = {clazz};
return (T)Proxy.newProxyInstance(classLoader, classes, (proxy, method, args)->{
RpcRequestMessage message = new RpcRequestMessage(
seqId.getAndIncrement(),
clazz.getName(), //示例:com.changlu.server.service.rpc.RpcService
method.getName(), //sayHello
method.getReturnType(), //String.class
method.getParameterTypes(), //new Class[]{String.class}
args //new Object[]{"changlu"}
);
//用于其他线程与当前主线程进行通信
final DefaultPromise<Object> promise = new DefaultPromise<>(getChannel().eventLoop());
RpcResponseMessageHandler.promiseMap.put(message.getSequenceId(), promise);
//远程调用(获取channel方式是通过单例模式来获取)
getChannel().writeAndFlush(message);
promise.await();//阻塞等待
if (promise.isSuccess()) {
return promise.getNow();
}else{
throw new RuntimeException((Exception)promise.getNow());
}
});
}
/**
* 单例(懒汉,双重检测锁):获取channel
* @return
*/
public static Channel getChannel(){
if (channel != null) {
return channel;
}
synchronized (lock) {
if (channel != null){
return channel;
}
initChannel();//初始化channel
return channel;
}
}
/**
* 初始化channel
*/
private static void initChannel() {
MessageCodecSharable messageCodec = new MessageCodecSharable();
LoggingHandler loggingHandler = new LoggingHandler(LogLevel.DEBUG);
RpcResponseMessageHandler responseMessageHandler = new RpcResponseMessageHandler();
NioEventLoopGroup worker = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap()
.group(worker)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
// ch.pipeline().addLast(new StringEncoder());
ch.pipeline().addLast(new ProcotolFrameDecoder());
ch.pipeline().addLast(loggingHandler);
ch.pipeline().addLast(messageCodec);
ch.pipeline().addLast(responseMessageHandler);
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//初始测试
// final RpcRequestMessage message = new RpcRequestMessage(
// 1,
// "com.changlu.server.service.rpc.RpcService",
// "sayHello",
// String.class,
// new Class[]{String.class},
// new Object[]{"changlu"}
// );
// ctx.channel().writeAndFlush(message);
}
});
}
});
// Channel channel = null;
try {
channel = bootstrap.connect("127.0.0.1", 8080).sync().channel();
log.debug("客户端连接成功!");
channel.closeFuture().addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
log.debug("客户端关闭连接!");
worker.shutdownGracefully();
}
});
} catch (InterruptedException e) {
e.printStackTrace();
}
}
服务端
RpcResponseMessage
说明:服务器端方法调用结束时响应给客户端的对象封装。
import com.changlu.message.Message;
import lombok.Data;
import lombok.ToString;
/**
* @ClassName RpcResponseMessage
* @Author ChangLu
* @Date 2022/1/16 21:38
* @Description RPC消息请求对象
*/
@Data
@ToString(callSuper = true)
public class RpcResponseMessage extends Message {
/**
* 返回值
*/
private Object returnValue;
/**
* 异常值
*/
private Exception exceptionValue;
public RpcResponseMessage() {
}
public RpcResponseMessage(Object returnValue, Exception exceptionValue) {
this.returnValue = returnValue;
this.exceptionValue = exceptionValue;
}
@Override
public int getMessageType() {
return RPC_MESSAGE_TYPE_RESPONSE;
}
}
service、serviceImpl
说明:用于进行rpc调用测试的接口及实现类。
/**
* @ClassName RpcService
* @Author ChangLu
* @Date 2022/1/16 22:02
* @Description service接口
*/
public interface RpcService {
/**
*
* @param name 用户名
* @return hello,用户名!
*/
String sayHello(String name);
}
/**
* @ClassName RpcSerivceImpl
* @Author ChangLu
* @Date 2022/1/16 22:03
* @Description RpcService实现类
*/
public class RpcServiceImpl implements RpcService{
@Override
public String sayHello(String name) {
// int i = 1/0; //测试抛出异常
return "hello," + name + "!";
}
}
接口类匹配实现类实例机制(配置文件)
说明:对于指定接口的具体实现类,我们通过读取配置文件来进行处理。
application.properties
:模拟直接从Spring中根据接口来取出指定的实例(或代理类)
# 接口=实现类
com.changlu.server.service.rpc.RpcService=com.changlu.server.service.rpc.RpcServiceImpl
ServicesFactory.java
:之后服务器接收到远程接口调用传来的请求对象即可根据传来的接口取到指定的实现类
import com.sun.deploy.config.Config;
import java.io.InputStream;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* @ClassName ServicesFactory
* @Author ChangLu
* @Date 2022/1/16 22:34
* @Description 获取service实现类工厂
*/
public class ServicesFactory {
private static Properties properties;
private static Map<Class<?>, Object> servicesMap = new ConcurrentHashMap<>();
static {
try (InputStream is = Config.class.getResourceAsStream("/application.properties")) {
properties = new Properties();
properties.load(is);
final Set<String> names = properties.stringPropertyNames();
for (String name : names) {
//匹配所有的Service结尾键值对
if (name.endsWith("Service")){
final Class<?> interfaceClass = Class.forName(name);
final Class<?> instanceClass = Class.forName(properties.getProperty(name));
//map以【接口:实现类实例】存储
servicesMap.put(interfaceClass, instanceClass.newInstance());
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
//输入:接口class 输出:对应接口实现类实例(properties配置)
public static <T> T getServiceImpl(Class<T> clazz){
return (T) servicesMap.get(clazz);
}
}
RpcRequestMessageHandler
说明:针对于解码得到的RpcRequestMessage对象来进行方法调用,根据响应结果来设置返回值或异常。
import com.changlu.message.rpc.RpcRequestMessage;
import com.changlu.message.rpc.RpcResponseMessage;
import com.changlu.server.service.rpc.ServicesFactory;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;
import java.lang.reflect.Method;
/**
* @ClassName RpcRequestMessageHandler
* @Author ChangLu
* @Date 2022/1/16 22:05
* @Description TODO
*/
@Slf4j
@ChannelHandler.Sharable
public class RpcRequestMessageHandler extends SimpleChannelInboundHandler<RpcRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcRequestMessage message){
RpcResponseMessage responseMessage = new RpcResponseMessage();
responseMessage.setSequenceId(message.getSequenceId());//将请求序列号存储到响应对象中
Class<?> clazz = null;
Method method = null;
Object returnVal = null;
try {
clazz = Class.forName(message.getInterfaceName());
method = clazz.getMethod(message.getMethodName(), message.getParameterTypes());
returnVal = method.invoke(ServicesFactory.getServiceImpl(clazz), message.getParameterValue());
//运行成功设置值
responseMessage.setReturnValue(returnVal);
} catch (Exception e) {
e.printStackTrace();
//出现异常封装好异常信息后传出
responseMessage.setExceptionValue(new Exception(e.getCause().getMessage()));
}
ctx.channel().writeAndFlush(responseMessage);
}
public static void main(String[] args) throws Exception{
final RpcRequestMessage message = new RpcRequestMessage(
1,
"com.changlu.server.service.rpc.RpcService",
"sayHello",
String.class,
new Class[]{String.class},
new Object[]{"changlu"}
);
final Class<?> clazz = Class.forName(message.getInterfaceName());
final Method method = clazz.getMethod(message.getMethodName(), message.getParameterTypes());
final Object returnVal = method.invoke(ServicesFactory.getServiceImpl(clazz), message.getParameterValue());
System.out.println(returnVal);
//ServicesFactory.getServiceImpl测试:根据配置文件来获取指定实现子类的实例!
// final Object service = ServicesFactory.getServiceImpl(Class.forName(message.getInterfaceName()));
// RpcServiceImpl rpcService = (RpcServiceImpl)service;
// System.out.println(rpcService.sayHello("changlu"));
}
}
Server
说明:服务器实现。
import com.changlu.protocol.MessageCodecSharable;
import com.changlu.protocol.ProcotolFrameDecoder;
import com.changlu.server.handler.rpc.RpcRequestMessageHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
/**
* @ClassName RpcServer
* @Author ChangLu
* @Date 2022/1/16 21:45
* @Description RPC服务器
*/
@Slf4j
public class RpcServer {
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup(2);
//handler
MessageCodecSharable messageCodec = new MessageCodecSharable();
LoggingHandler loggingHandler = new LoggingHandler(LogLevel.DEBUG);
RpcRequestMessageHandler rpcRequestMessageHandler = new RpcRequestMessageHandler();
try {
final ChannelFuture future = new ServerBootstrap()
.group(boss, worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProcotolFrameDecoder());
ch.pipeline().addLast(loggingHandler);
ch.pipeline().addLast(messageCodec);
ch.pipeline().addLast(rpcRequestMessageHandler);
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("get => {}", msg);
}
});
}
}).bind(new InetSocketAddress(8080)).sync();
final Channel channel = future.channel();
log.debug("服务器启动成功!");
}catch (Exception e){
e.printStackTrace();
}
}
}
三、测试
该案例针对于远程调用成功、出现异常进行测试。
成功
client:
public class RpcClient {
public static void main(String[] args) {
final RpcService proxyService = getProxyService(RpcService.class);
//测试
System.out.println(proxyService.sayHello("changlu"));
System.out.println(proxyService.sayHello("liner"));
System.out.println(proxyService.sayHello("world!"));
}
}
实际调用实现类:
/**
* @ClassName RpcSerivceImpl
* @Author ChangLu
* @Date 2022/1/16 22:03
* @Description RpcService实现类
*/
public class RpcServiceImpl implements RpcService{
@Override
public String sayHello(String name) {
return "hello," + name + "!";
}
}
结果:
异常
client:调用一次方法即可
public class RpcClient {
public static void main(String[] args) {
final RpcService proxyService = getProxyService(RpcService.class);
//测试
System.out.println(proxyService.sayHello("changlu"));
}
}
/**
* @ClassName RpcSerivceImpl
* @Author ChangLu
* @Date 2022/1/16 22:03
* @Description RpcService实现类
*/
public class RpcServiceImpl implements RpcService{
@Override
public String sayHello(String name) {
int i = 1/0; //测试抛出异常
return "hello," + name + "!";
}
}
结果:
参考文章
[1]. google Gson 无法序列化反序列化 Class 类型
[2]. 黑马netty RPC实现【130-138】:其中的协议、编解码器是基于聊天室案例之上
- 点赞
- 收藏
- 关注作者
评论(0)