Thrift client并发异常之out of sequence

举报
yami_1990 发表于 2020/09/22 09:57:44 2020/09/22
【摘要】 1. 环境异常现象及初步分析 项目中对接外部的thrift服务,使用该服务提供的thrift client进行访问;我们刚开始进行功能验证时一切运行正常,但后来请求量稍微高一点后,就出现了进程卡死的现象,查看后台日志,开始以为是哪个接口有bug导致服务崩溃,后来发现接口是随机性的报错,并且在异常抛出后,client无法再响应后继请求。 跟踪日志查看,每次在出现org....

1. 环境异常现象及初步分析

       项目中对接外部的thrift服务,使用该服务提供的thrift client进行访问;我们刚开始进行功能验证时一切运行正常,但后来请求量稍微高一点后,就出现了进程卡死的现象,查看后台日志,开始以为是哪个接口有bug导致服务崩溃,后来发现接口是随机性的报错,并且在异常抛出后,client无法再响应后继请求。

       跟踪日志查看,每次在出现org.apache.thrift.TApplicationException: xxx  failed: out of sequence response异常后,client便无法进行下一次请求,而out of sequence明显不是我们服务中自己定义的异常,跟进去的代码看下,该异常来自于thrift源码TServiceClient下面这段:

//参见:TServiceClient
//API方法调用时,发送请求数据流
protected void sendBase(String methodName, TBase args) throws TException {
    oprot_.writeMessageBegin(new TMessage(methodName, TMessageType.CALL, ++seqid_));//首先写入"方法名称"和"seqid_"
    args.write(oprot_);//序列化参数
    oprot_.writeMessageEnd();
    oprot_.getTransport().flush();
}

protected void receiveBase(TBase result, String methodName) throws TException {
    TMessage msg = iprot_.readMessageBegin();//如果执行有异常
    if (msg.type == TMessageType.EXCEPTION) {
    TApplicationException x = TApplicationException.read(iprot_);
    iprot_.readMessageEnd();
    throw x;
    }
    //检测seqid是否一致
    if (msg.seqid != seqid_) {
    throw new TApplicationException(TApplicationException.BAD_SEQUENCE_ID, methodName + " failed: out of sequence response");
    }
    result.read(iprot_);
    //反序列化
    iprot_.readMessageEnd();
}


从上面TApplicationException.BAD_SEQUENCE_ID这块就是我们的异常来源,那么seqid是什么,而我们的msg的seqid又为何不一致了?

 

2. 根因分析

客户端和服务端之间的通讯的媒介是Message,那么我们就简单看下Message的结构是怎样的。通常在thrift自动生成的代码中都会有send_methodname_xxx,receive_methodname_xxx这些方法,而在send方法中的第一句就是写入一个Message Begin。不论不同的协议会如何处理传入参数,我们知道了这个Message Begin是一个org.apache.thrift.protocol.TMessage类型的。那么TMessage又包括了那些东西

public TMessage(String n, byte t, int s) { 
    name = n; type = t; seqid = s; 
} 
public final String name; 
public final byte type; 
public final int seqid;

        由此可见,一个Message应该包括的内容有3个:name,type,seqid。

  •  name:根据上面send方法中的设置来看,name被设置为功能名称,如果使用TMessage的无参数构造器的话,name会被设为空字符串””。

  • type:虽然这是一个byte类型的,但是实际上是从TMessageType中选择的,具体含义见源码,不过如果使用TMessage的无参数构造器的话,type会被置为TType.STOP。

    public final class TMessageType {   public static final byte CALL  = 1;   public static final byte REPLY = 2;   public static final byte EXCEPTION = 3;   public static final byte ONEWAY = 4; }

  • seqid:这个就是用来标记客户端的

        因此,只要我们同一个client的seqid能够对应上就不会出错,那么应该就是我们的client并发请求时响应的seqid乱掉了,导致异常。

3. 锁定原因并解决

        我们client的msg的顺序异常,难道client底层不能并发进行请求,没有个连接池来管理seqid,为了验证猜想,继续查看源码可以看到thrift确实没有连接池,client 中生成的 seqid 只是用来和服务端返回的 rseqid 进行匹配。

func (p *TStandardClient) Recv(iprot TProtocol, seqId int32, method string, result TStruct) error {
    rMethod, rTypeId, rSeqId, err := iprot.ReadMessageBegin()
    if err != nil {
        return err
    }
     
    if method != rMethod {
        return NewTApplicationException(WRONG_METHOD_NAME, fmt.Sprintf("%s: wrong method name", method))
    } else if seqId != rSeqId {
        return NewTApplicationException(BAD_SEQUENCE_ID, fmt.Sprintf("%s: out of order sequence response", method))
    } else if rTypeId == EXCEPTION {
        var exception tApplicationException
        if err := exception.Read(iprot); err != nil {
            return err
        }
     
     if err := iprot.ReadMessageEnd(); err != nil {
         return err
     }
     return &exception
     } else if rTypeId != REPLY {
         return NewTApplicationException(INVALID_MESSAGE_TYPE_EXCEPTION, fmt.Sprintf("%s: invalid message type", method))
     }
     
     if err := result.Read(iprot); err != nil {
         return err
     }
     
     return iprot.ReadMessageEnd()
}

 

thrift 的每个 client 对象中包裹了一个 transport

 ...
    useTransport, err := transportFactory.GetTransport(transport)
    client := NewEchoClientFactory(useTransport, protocolFactory)
    if err := transport.Open(); err != nil {
        fmt.Fprintln(os.Stderr, "Error opening socket to 127.0.0.1:9898", " ", err)
        os.Exit(1)
    }
    defer transport.Close()
     
    req := &EchoReq{Msg: "You are welcome."}
    res, err := client.Echo(context.TODO(), req)
    ...
     
    type EchoClient struct {
        c thrift.TClient
    }
     
    func NewEchoClientFactory(t thrift.TTransport, f thrift.TProtocolFactory) *EchoClient {
        return &EchoClient{
            c: thrift.NewTStandardClient(f.GetProtocol(t), f.GetProtocol(t)),
        }
    }


        这个包裹的 transport 就是一条单独的 tcp 连接,没有连接池,并发请求msg的返回先后就无法保证。

        既然原因是thrift的单个client不能处理并发请求,并且我们对client的使用场景下不需要client单实例,那就自然而然的想到添加一个对象池管理我们的client,这样就能很好的解决我们的client使用问题,最终选择使用了GenericObjectPool来管理我们的对象池。不过每个配置场景也要好好学习下,后面在性能测试时还出现了配置原因导致了性能损耗问题,这是后话。

 

相关参考文档: https://issues.cloudera.org/browse/KITE-328


【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。