基于Thrift Protocol实现自定义信息传输框架
基于Thrift Protocol实现自定义信息传输框架
1 背景
基于开源项目进行二次开发,尤其对于使用Thrift协议的项目,对现有接口进行自定义信息传输往往需要修改现有接口定义和数据结构,对开源项目往往产生较大冲击,对于后续复用开源项目最新特性提升了改造成本,并且严重影响与开源社区的兼容性。
比如基于Hive MetaStore服务追加调用链跟踪特性,最简单的方案就是修改现有的协议和数据结构,增加诸如traceId等信息用于实现调用链能力,但该方案会极大地冲击开源项目现有的协议规范,并带来诸如维护和兼容性问题。
基于上述背景,提出一种基于Thrift Protocol实现自定义信息传输的解决方案,可以在不变动现有服务协议和数据结构的前提下,仅通过较少的协议修改和Class引用修改达到自定义信息传输的目的。
2 方案设计
2.1 Thrift网络栈简介
Thrift网络栈示例如下:(参考官网)
Transport: 与底层传输系统结构,提供网络数据读写能力;
Protocol: 定义待传输数据的序列化/反序列化能力;
Processor: 提供从输入流读数据、往输出流写数据能力,输入、输出流即Protocol表示的数据流对象;
Server: 基于Transport/Protocol/Processor提供RPC服务能力,等待客户端进行远程访问,提供多种服务模型。
2.2 Thrift 消息格式
Thrift 消息格式主要分为四种:<参考org.apache.thrift.protocol.TProtocol>
1. Message,读写消息头,消息头包含了版本号(version),方法名(name),序列号(seqid)等信息
2. Struct,将 RPC 方法的参数/返回值封装成结构体,读写结构体即表示要读写 RPC 方法参数了
3. Field,每一个参数都被抽象成 Field,Field 主要包含了字段的索引信息,类型信息等
4. Type,即读写各种具体的数据
2.3 自定义信息传输框架
TBinaryProtocol(com.huawei.dlcatalog.thrift):继承至TBinaryProtocol,在现有read/write基础上,追加读写自定义信息Header逻辑;
TProcessor(com.huawei.dlcatalog.thrift):继承至TProcessor,实现对自定义信息Header的读写逻辑,并借助ProcessFunction将自定义信息向下传递给Server接口上下文信息中(基于Iface实现的Handler需要自实现上下文,用于承载自定义信息并实现与请求绑定,避免映射错乱,也避免对服务端接口规范有冲击从而有变动)【本文示例中给了一个简单TContext实现】;
3 方案实现
针对第二章的内容,进行核心代码展示,具体如下。
3.1 TBinaryProtocol(com.huawei.dlcatalog.thrift)
通过thrift网络框架可以看出,protocol定义待传输数据的序列化/反序列化能力,那么对于自定义信息的传输,只要保证序列化顺序与反序列化顺序一致即可。具体实现如下:
3.1.1 写自定义信息
@Override
public void writeMessageBegin(TMessage message) throws TException {
super.writeMessageBegin(message);
TField tHead = new TField("HEADERS", TType.MAP, (short) 0);
this.writeFieldBegin(tHead);
writeMapBegin(new TMap(TType.STRING, TType.STRING, headers.size()));
for (Map.Entry<String, String> entry : headers.entrySet()) {
this.writeString(entry.getKey());
this.writeString(entry.getValue());
}
writeMapEnd();
this.writeFieldEnd();
}
3.1.2 读自定义信息
public Map<String, String> readHeaderField() throws TException {
Map<String, String> headers = new HashMap<>();
TField schemeField = this.readFieldBegin();
if (schemeField.id == 0 && schemeField.type == TType.MAP) {
TMap tMap = this.readMapBegin();
for (int i = 0; i < tMap.size; ++i) {
String key = this.readString();
String value = this.readString();
headers.put(key, value);
}
this.readMapEnd();
}
this.readFieldEnd();
return headers;
}
3.2 TProcessor(com.huawei.dlcatalog.thrift)
TProcessor(com.huawei.dlcatalog.thrift) 继承至TProcessor,实现对自定义信息Header的读写逻辑,并借助ProcessFunction将自定义信息向下传递给Server接口上下文信息中(基于Iface实现的Handler需要自实现上下文,用于承载自定义信息并实现与请求绑定,避免映射错乱,也避免对服务端接口规范有冲击从而有变动)
3.2.1 读取header并透传实例
public void process(TProtocol in, TProtocol out) throws TException {
Map<String, String> headers = null;
if (in instanceof TBinaryProtocol) {
TBinaryProtocol serverProtocol = (TBinaryProtocol) in;
// read header and reset, not effect the origin stream
serverProtocol.markTFramedTransport(in);
serverProtocol.readMessageBegin();
headers = serverProtocol.readHeaderField();
System.out.println(headers.toString());
serverProtocol.readMessageEnd();
// prepare request context.
serverProtocol.resetTFramedTransport(in);
}
tProcessor.process(in, out, headers);
}
3.3 ProcessFunction(com.huawei.dlcatalog.thrift)
承接TProcessor分发的请求并进行处理,在此将自定义的信息填入到请求的上下文中(以TContext为示例,将TContext与Handler绑定----仅用于示意,存在并发问题)
public final void process(int seqid, TProtocol iprot, TProtocol oprot, I iface, Map<String, String> headers) throws TException {
……
try {
if (iface instanceof TBaseHandler) {
((TBaseHandler) iface).setContext(new TContext(headers));
}
result = getResult(iface, args);
……
TBaseHandler 用于示意如何将获取到的自定义信息填充到请求的上下文中,仅用于示意,存在并发问题。
4 对接流程
1. 替换客户端、服务端初始化协议为com.huawei.dlcatalog.thrift.TBinaryProtocol(对于原服务模型非采用TBinaryProtocol应用,可以参考本方案进行类比实现);
2. 接口实现类集成至TBaseHadnler,用于辅助ProcessFunction进行当前请求上下文设置;
3. Thrift生成的代码中,将原有的TBaseProcessor/ProcessFunction切换为com.huawei.dlcatalog.thrift package下定义;
5 Demo
具体demo参考附件[example.rar]
6 参考资料
1. Thrift官网 http://thrift.apache.org/
2. 基于 Thrift RPC 的调用链跟踪 https://fredal.xin/hunter-with-thrift
- 点赞
- 收藏
- 关注作者
评论(0)