FIX三天日记-quick fix源码

举报
兔老大 发表于 2022/08/13 01:16:45 2022/08/13
【摘要】 一、概述 1.1 如何阅读? 对于一般人,没必要像对待常用公共组件一样,搞清楚每一个点,我们从使用的角度出发,把我们用到的功能读到即可。 1.2 如何下载 ? https://github.com/quickfix/quickfix 1.3 大概都有哪些? 源码就在src\C++下,我们先大致浏览一下。 DataDicti...

一、概述

1.1 如何阅读?

对于一般人,没必要像对待常用公共组件一样,搞清楚每一个点,我们从使用的角度出发,把我们用到的功能读到即可。

1.2 如何下载 ?

https://github.com/quickfix/quickfix

1.3 大概都有哪些?

源码就在src\C++下,我们先大致浏览一下。

DataDictionary.cpp:解析诸如FIX42.xml的数据字典
Field
.cpp:数据字典中解析预定义的field
Message.cpp:数据字典中解析处理message节点
Http
.cpp: 实现http引擎的部分
Socket.cpp:会话层的通信
Session
.cpp: 会话层的东西
还有一些其他的文件,略去不说。这里还要注意还有几个子文件夹:fix40/,fix41/,fix42/,fix43/,fix44/,fix50/,fix50sp1。这几个文件夹下是具体实现了该版本的一些头文件。

1.4 我会用到哪些?

上篇文章有使用的例子,我们去掉多余部分,拿过来是这样的:


  
  1. int main( int argc, char** argv )
  2. {
  3. FIX::Initiator * initiator = 0;
  4. try
  5. {
  6. FIX::SessionSettings settings( file );
  7. Application application;
  8. FIX::FileStoreFactory storeFactory( settings );
  9. FIX::ScreenLogFactory logFactory( settings );
  10. initiator = new FIX::SocketInitiator( application, storeFactory,
  11. settings, logFactory );
  12. initiator->start();
  13. application.run();
  14. initiator->stop();
  15. delete initiator;
  16. '''
  17. }
  18. catch ( std::exception & e )
  19. {
  20. '''
  21. }
  22. }

请记住每一行代码,接下来,本文基本是每章讲解本代码中的一行。

二、SessionSettings

就是这一行:FIX::SessionSettings settings( file );

2.1 数据字典

Quickfix中进行数据字典的载入,解析本质是对几个xml文件的解析,是采用pugixml parser,官方网站:pugixml.org - Home。正如官网介绍的那样:

Light-weight, simple and fast XML parser for C++ with XPath support

然后Quickfix中在之上进行了一层自己的封装,形成PUGIXML_DOMAttributes类,PUGIXML_DOMNode类,PUGIXML_DOMDocument类。在头文件”PUGIXML_DOMDocument.h”中进行了定义,如下:


  
  1. class PUGIXML_DOMAttributes : public DOMAttributes
  2. {
  3. public:
  4. PUGIXML_DOMAttributes( pugi::xml_node pNode )
  5. : m_pNode(pNode) {}
  6. bool get( const std::string&, std::string& );
  7. DOMAttributes::map toMap();
  8. private:
  9. pugi::xml_node m_pNode;
  10. };
  11. /// XML node as represented by pugixml.
  12. class PUGIXML_DOMNode : public DOMNode
  13. {
  14. public:
  15. PUGIXML_DOMNode( pugi::xml_node pNode )
  16. : m_pNode(pNode) {}
  17. ~PUGIXML_DOMNode() {}
  18. DOMNodePtr getFirstChildNode();
  19. DOMNodePtr getNextSiblingNode();
  20. DOMAttributesPtr getAttributes();
  21. std::string getName();
  22. std::string getText();
  23. private:
  24. pugi::xml_node m_pNode;
  25. };
  26. /// XML document as represented by pugixml.
  27. class PUGIXML_DOMDocument : public DOMDocument
  28. {
  29. public:
  30. PUGIXML_DOMDocument() throw( ConfigError );
  31. ~PUGIXML_DOMDocument();
  32. bool load( std::istream& );
  33. bool load( const std::string& );
  34. bool xml( std::ostream& );
  35. DOMNodePtr getNode( const std::string& );
  36. private:
  37. pugi::xml_document m_pDoc;
  38. };
  39. }

 

 其中大多数函数不需要特别关心,我们只需要重点关心PUGIXML_DOMDocument类中的load()函数,这也是最重要+最复杂的函数。


  
  1. bool PUGIXML_DOMDocument::load( std::istream& stream )
  2. {
  3. try
  4. {
  5. return m_pDoc.load(stream);
  6. }
  7. catch( ... ) { return false; }
  8. }
  9. bool PUGIXML_DOMDocument::load( const std::string& url )
  10. {
  11. try
  12. {
  13. return m_pDoc.load_file(url.c_str());
  14. }
  15. catch( ... ) { return false; }
  16. }

这个函数就是对给定一个xml路径然后装载后返回一个pugi::xml_document的对象。

2.2 数据字典解析

 上面的类实现了诸如FIX44.xml的载入处理,数据字典中定义了很多结构节点,比如fields,messages,groups等,DataDictionary.cpp是真正对这些xml文件进行解析的源文件。DataDictionary.h中部分源代码如下:


  
  1. class DataDictionary
  2. {
  3. typedef std::set < int > MsgFields;
  4. typedef std::map < std::string, MsgFields > MsgTypeToField;
  5. typedef std::set < std::string > MsgTypes;
  6. typedef std::set < int > Fields;
  7. typedef std::map < int, bool > NonBodyFields;
  8. typedef std::vector< int > OrderedFields;
  9. typedef message_order OrderedFieldsArray;
  10. typedef std::map < int, TYPE::Type > FieldTypes;
  11. typedef std::set < std::string > Values;
  12. typedef std::map < int, Values > FieldToValue;
  13. typedef std::map < int, std::string > FieldToName;
  14. typedef std::map < std::string, int > NameToField;
  15. typedef std::map < std::pair < int, std::string > , std::string > ValueToName;
  16. // while FieldToGroup structure seems to be overcomplicated
  17. // in reality it yields a lot of performance because:
  18. // 1) avoids memory copying;
  19. // 2) first lookup is done by comparing integers and not string objects
  20. // TODO: use hash_map with good hashing algorithm
  21. typedef std::map < std::string, std::pair < int, DataDictionary* > > FieldPresenceMap;
  22. typedef std::map < int, FieldPresenceMap > FieldToGroup;
  23. public:
  24. DataDictionary();
  25. DataDictionary( const DataDictionary& copy );
  26. DataDictionary( std::istream& stream ) throw( ConfigError );
  27. DataDictionary( const std::string& url ) throw( ConfigError );
  28. virtual ~DataDictionary();
  29. void readFromURL( const std::string& url ) throw( ConfigError );
  30. void readFromDocument( DOMDocumentPtr pDoc ) throw( ConfigError );
  31. void readFromStream( std::istream& stream ) throw( ConfigError );
  32. ......
  33. };
  34. ....

 可以看到DataDictionary类中定义了很多的std::map和std::vector,这些容器都是用来存储从FIX4X.xml文件中解析来的内容,一些映射,但是是否过于繁琐,我没有深究。

比如:

typedef std::map < int, std::string > FieldToName;

 

 

表示存储field和实际的字段名的映射,比如8对应BeginString;

typedef std::map < int, Values > FieldToValue;
 

表示枚举当中的int值跟实际的字段名的映射,比如:


  
  1. <field number='13' name='CommType' type='CHAR'>
  2. <value enum='1' description='PER_UNIT' />
  3. <value enum='2' description='PERCENTAGE' />
  4. <value enum='3' description='ABSOLUTE' />
  5. <value enum='4' description='4' />
  6. <value enum='5' description='5' />
  7. <value enum='6' description='POINTS_PER_BOND_OR_CONTRACT_SUPPLY_CONTRACTMULTIPLIER' />
  8. </field>

3代表ABSOLUTE;1代表PER_UNIT

另外需要注意的成员函数readFrom*()系列,底层就是上一章中的类,进行xml的载入。


  
  1. void DataDictionary::readFromURL( const std::string& url )
  2. throw( ConfigError )
  3. {
  4. DOMDocumentPtr pDoc = DOMDocumentPtr(new PUGIXML_DOMDocument());
  5. if(!pDoc->load(url))
  6. ¦ throw ConfigError(url + ": Could not parse data dictionary file");
  7. try
  8. {
  9. ¦ readFromDocument( pDoc );
  10. }
  11. catch( ConfigError& e )
  12. {
  13. ¦ throw ConfigError( url + ": " + e.what() );
  14. }
  15. }
  16. void DataDictionary::readFromStream( std::istream& stream )
  17. throw( ConfigError )
  18. {
  19. >* DOMDocumentPtr pDoc = DOMDocumentPtr(new PUGIXML_DOMDocument());
  20. if(!pDoc->load(stream))
  21. ¦ throw ConfigError("Could not parse data dictionary stream");
  22. readFromDocument( pDoc );
  23. }
  24. >*void DataDictionary::readFromDocument( DOMDocumentPtr pDoc )
  25. throw( ConfigError )
  26. {
  27. // VERSION
  28. DOMNodePtr pFixNode = pDoc->getNode("/fix");
  29. if(!pFixNode.get())
  30. ...
  31. }

到这里,数据字典的解析就完成了。简单的理解就是,读入xml文件,然后针对xml文件里的内容,把内容做成映射用map和vector存储。

2.3 数据字典存储

SessionSettings


  
  1. /// Container for setting dictionaries mapped to sessions.
  2. class SessionSettings
  3. {
  4. public:
  5. SessionSettings() { m_resolveEnvVars = false; }
  6. SessionSettings( std::istream& stream, bool resolveEnvVars = false ) EXCEPT ( ConfigError );
  7. SessionSettings( const std::string& file, bool resolveEnvVars = false ) EXCEPT ( ConfigError );
  8. ''''''
  9. typedef std::map < SessionID, Dictionary > Dictionaries;
  10. std::set < SessionID > getSessions() const;
  11. private:
  12. Dictionaries m_settings;
  13. Dictionary m_defaults;
  14. '''
  15. friend std::istream& operator>>( std::istream&, SessionSettings& ) EXCEPT ( ConfigError );
  16. friend std::ostream& operator<<( std::ostream&, const SessionSettings& );
  17. };

是通过友元函数 operator >> 从任意的流中读取配置,通过一个sessonid的set和一个sessionid->dictionary的map,管理每个段。

三、Application

3.1 Application

若是须要使用QuickFIX开发FIX应用,则须要实现FIX::Application接口,并重载不一样FIX协议版本的MessageCracker::OnMessage接口,如FIX42::MessageCracker。


  
  1. class Application
  2. {
  3. public:
  4.   virtual ~Application() {};
  5.   /// Notification of a session begin created
  6.   virtual void onCreate( const SessionID& ) = 0;
  7.  
  8.   /// Notification of a session successfully logging on
  9.   virtual void onLogon( const SessionID& ) = 0;
  10.  
  11.   /// Notification of a session logging off or disconnecting
  12.   virtual void onLogout( const SessionID& ) = 0;
  13.  
  14.   /// Notification of admin message being sent to target
  15.   virtual void toAdmin( Message&, const SessionID& ) = 0;
  16.  
  17.   /// Notification of app message being sent to target
  18.   virtual void toApp( Message&, const SessionID& )
  19.   EXCEPT ( DoNotSend ) = 0;
  20.  
  21.   /// Notification of admin message being received from target
  22.   virtual void fromAdmin( const Message&, const SessionID& )
  23.   EXCEPT ( FieldNotFound, IncorrectDataFormat, IncorrectTagValue, RejectLogon ) = 0;
  24.  
  25.   /// Notification of app message being received from target
  26.   virtual void fromApp( const Message&, const SessionID& )
  27.   EXCEPT ( FieldNotFound, IncorrectDataFormat, IncorrectTagValue, UnsupportedMessageType ) = 0;
  28. };


 onCreate:当Fix Session创建时调用。
onLogon:当Fix Session登陆成功时调用。
onLogout:当Fix Session退出时调用。
fromAdmin:当收到一个Admin类型消息时调用。
fromApp:当收到一个不属于Admin 类型消息时调用。
toAdmin:当发送一个admin类型消息调用。
toApp:当发送一个非admin(业务类型)消息调用。

admin一般是服务提供方,app是客户端


3.2 MessageCracker

除了实现FIX::Application接口,还需要重新实现FIX::MessageCracker从具体的FIX协议版本实现继承而来的onMessage方法,crack接口就可以根据message类型匹配到你实现的具体onMessage接口上。


  
  1. void crack( const Message& message,
  2. const SessionID& sessionID )
  3. {
  4. const FIX::BeginString& beginString =
  5. FIELD_GET_REF( message.getHeader(), BeginString );
  6. crack( message, sessionID, beginString );
  7. }
  8. void crack( const Message& message,
  9. const SessionID& sessionID,
  10. const BeginString& beginString )
  11. {
  12. if ( beginString == BeginString_FIX40 )
  13. ((FIX40::MessageCracker&)(*this)).crack((const FIX40::Message&) message, sessionID);
  14. else if ( beginString == BeginString_FIX41 )
  15. ((FIX41::MessageCracker&)(*this)).crack((const FIX41::Message&) message, sessionID);
  16. else if ( beginString == BeginString_FIX42 )
  17. ((FIX42::MessageCracker&)(*this)).crack((const FIX42::Message&) message, sessionID);
  18. else if ( beginString == BeginString_FIX43 )
  19. ((FIX43::MessageCracker&)(*this)).crack((const FIX43::Message&) message, sessionID);
  20. else if ( beginString == BeginString_FIX44 )
  21. ((FIX44::MessageCracker&)(*this)).crack((const FIX44::Message&) message, sessionID);
  22. else if ( beginString == BeginString_FIXT11 )
  23. {
  24. if( message.isAdmin() )
  25. {
  26. ((FIXT11::MessageCracker&)(*this)).crack((const FIXT11::Message&) message, sessionID);
  27. }
  28. else
  29. {
  30. '''
  31. }
  32. }
  33. }

四、*Factory 

就是这两行:

FIX::FileStoreFactory storeFactory( settings );

FIX::ScreenLogFactory logFactory( settings );

逻辑比较简单,就是读了上文介绍的settings,然后存下来,存储结构如下:

  std::string m_path;

  SessionSettings m_settings;

五、initiator/Acceptor

也就是这一行 initiator = new FIX::SocketInitiator( application, storeFactory, settings, logFactory );

这俩大概差不多,先看一个。

主要代码如下:


  
  1. /**
  2. * Base for classes which act as an acceptor for incoming connections.
  3. *
  4. * Most users will not need to implement one of these. The default
  5. * SocketAcceptor implementation will be used in most cases.
  6. */
  7. class Acceptor
  8. {
  9. public:
  10. ''''''
  11. Acceptor( Application&, MessageStoreFactory&,
  12. const SessionSettings&, LogFactory& ) EXCEPT ( ConfigError );
  13. virtual ~Acceptor();
  14. ''''''
  15. /// Poll the acceptor
  16. bool poll( double timeout = 0.0 ) EXCEPT ( ConfigError, RuntimeError );
  17. /// Stop acceptor.
  18. void stop( bool force = false );
  19. /// Check to see if any sessions are currently logged on
  20. bool isLoggedOn();
  21. Session* getSession( const std::string& msg, Responder& );
  22. const std::set<SessionID>& getSessions() const { return m_sessionIDs; }
  23. Session* getSession( const SessionID& sessionID ) const;
  24. const Dictionary* const getSessionSettings( const SessionID& sessionID ) const;
  25. bool has( const SessionID& id )
  26. { return m_sessions.find( id ) != m_sessions.end(); }
  27. bool isStopped() { return m_stop; }
  28. Application& getApplication() { return m_application; }
  29. MessageStoreFactory& getMessageStoreFactory()
  30. { return m_messageStoreFactory; }
  31. private:
  32. ''''''
  33. static THREAD_PROC startThread( void* p );
  34. typedef std::set < SessionID > SessionIDs;
  35. typedef std::map < SessionID, Session* > Sessions;
  36. thread_id m_threadid;
  37. Sessions m_sessions;
  38. SessionIDs m_sessionIDs;
  39. Application& m_application;
  40. MessageStoreFactory& m_messageStoreFactory;
  41. protected:
  42. SessionSettings m_settings;
  43. private:
  44. LogFactory* m_pLogFactory;
  45. Log* m_pLog;
  46. NullLog m_nullLog;
  47. bool m_firstPoll;
  48. bool m_stop;
  49. };

基本包含了之前介绍的大部分类,如

Session相关的(SessionSettings/set<SessionID>/map<SessionID, Session*>)、

Application(用于接收并处理消息的)、LogFactory(写日志的对象)

5.1 init

功能就是把配置的每一个session初始化,很简单。


  
  1. void Acceptor::initialize() EXCEPT ( ConfigError )
  2. {
  3. std::set < SessionID > sessions = m_settings.getSessions();
  4. std::set < SessionID > ::iterator i;
  5. if ( !sessions.size() )
  6. throw ConfigError( "No sessions defined" );
  7. SessionFactory factory( m_application, m_messageStoreFactory,
  8. m_pLogFactory );
  9. for ( i = sessions.begin(); i != sessions.end(); ++i )
  10. {
  11. if ( m_settings.get( *i ).getString( CONNECTION_TYPE ) == "acceptor" )
  12. {
  13. m_sessionIDs.insert( *i );
  14. m_sessions[ *i ] = factory.create( *i, m_settings.get( *i ) );
  15. }
  16. }
  17. if ( !m_sessions.size() )
  18. throw ConfigError( "No sessions defined for acceptor" );
  19. }

5.2 start

这一行:Acceptor/initiator->start();

  1. 调用 SocketAcceptor::onInitialize() 创建 socket 句柄,进行监听端口。
  2. 启动线程,调用 SocketAcceptor::onStart(),检测对端的连接

  
  1. void Acceptor::start() EXCEPT ( ConfigError, RuntimeError )
  2. {
  3. m_stop = false;
  4. onConfigure( m_settings );
  5. onInitialize( m_settings );
  6. HttpServer::startGlobal( m_settings );
  7. if( !thread_spawn( &startThread, this, m_threadid ) )
  8. throw RuntimeError("Unable to spawn thread");
  9. }

其他的操作大同小异,可以自己阅读

5.3 SocketAcceptor::onInitialize

主要功能就是对每个session设置监听


  
  1. void SocketAcceptor::onInitialize(const SessionSettings& s)
  2. EXCEPT ( RuntimeError )
  3. {
  4. short port = 0;
  5. try
  6. {
  7. m_pServer = new SocketServer(1);
  8. std::set<SessionID> sessions = s.getSessions();
  9. std::set<SessionID>::iterator i = sessions.begin();
  10. for( ; i != sessions.end(); ++i )
  11. {
  12. const Dictionary& settings = s.get( *i );
  13. port = (short)settings.getInt( SOCKET_ACCEPT_PORT );
  14. ''''''
  15. // 管理监听端口与 SeesionID 的对应关系
  16. m_portToSessions[port].insert(*i);
  17. // 为每个监听的端口创建 Socket 句柄: socket_handle
  18. m_pServer->add( port, reuseAddress, noDelay, sendBufSize, rcvBufSize );
  19. }
  20. }
  21. catch( SocketException& e )
  22. {
  23. ''''''
  24. }
  25. }

5.4 

5.2中的第二步调用


  
  1. THREAD_PROC Acceptor::startThread( void* p )
  2. {
  3. Acceptor * pAcceptor = static_cast < Acceptor* > ( p );
  4. pAcceptor->onStart();
  5. return 0;
  6. }

六、session

回顾所有我们浏览的代码,唯独没有介绍session,最后来看一下。

6.1 session创建

用factory(初始化心跳、session)


  
  1. Session* SessionFactory::create( const SessionID& sessionID,
  2. const Dictionary& settings ) EXCEPT ( ConfigError )
  3. {
  4. std::string connectionType = settings.getString( CONNECTION_TYPE );
  5. if ( connectionType != "acceptor" && connectionType != "initiator" )
  6. throw ConfigError( "Invalid ConnectionType" );
  7. if( connectionType == "acceptor" && settings.has(SESSION_QUALIFIER) )
  8. throw ConfigError( "SessionQualifier cannot be used with acceptor." );
  9. // 初始化心跳
  10. HeartBtInt heartBtInt( 0 );
  11. if ( connectionType == "initiator" )
  12. {
  13. heartBtInt = HeartBtInt( settings.getInt( HEARTBTINT ) );
  14. if ( heartBtInt <= 0 ) throw ConfigError( "Heartbeat must be greater than zero" );
  15. }
  16. // 创建 Session 对象
  17. SmartPtr<Session> pSession;
  18. pSession.reset( new Session( m_application, m_messageStoreFactory,
  19. sessionID, dataDictionaryProvider, sessionTimeRange,
  20. heartBtInt, m_pLogFactory ) );
  21. return pSession.release();
  22. }

其中session对象内属性太多,挑一些重要的看:

Application(会话)、

SessionID(标识唯一session)、

m_sessionTime/m_logonTime(主要用于之前讲的24小时重新连接,对应配置)、

m_senderDefaultApplVerID/m_targetDefaultApplVerID(发送端/接收端默 Fix 协议版本号)、

m_state(session状态)、

send()(发送消息函数)、

next()(处理收到的消息,比较重要)

6.2 next()

精简过的代码如下


  
  1. void Session::next( const Message& message, const UtcTimeStamp& timeStamp, bool queued )
  2. {
  3. const Header& header = message.getHeader();
  4. try
  5. {
  6. //检查时间
  7. if ( !checkSessionTime(timeStamp) )
  8. { reset(); return; }
  9. //获取类型,下面根据类型分处理方法
  10. const MsgType& msgType = FIELD_GET_REF( header, MsgType );
  11. //校验时间
  12. const BeginString& beginString = FIELD_GET_REF( header, BeginString );
  13. // make sure these fields are present
  14. FIELD_THROW_IF_NOT_FOUND( header, SenderCompID );
  15. FIELD_THROW_IF_NOT_FOUND( header, TargetCompID );
  16. if ( beginString != m_sessionID.getBeginString() )
  17. throw UnsupportedVersion();
  18. const DataDictionary& sessionDataDictionary =
  19. m_dataDictionaryProvider.getSessionDataDictionary(m_sessionID.getBeginString());
  20. if( m_sessionID.isFIXT() && message.isApp() )
  21. {
  22. ApplVerID applVerID = m_targetDefaultApplVerID;
  23. header.getFieldIfSet(applVerID);
  24. const DataDictionary& applicationDataDictionary =
  25. m_dataDictionaryProvider.getApplicationDataDictionary(applVerID);
  26. DataDictionary::validate( message, &sessionDataDictionary, &applicationDataDictionary );
  27. }
  28. else
  29. {
  30. sessionDataDictionary.validate( message );
  31. }
  32. if ( msgType == MsgType_Logon )
  33. nextLogon( message, timeStamp );
  34. else if ( msgType == MsgType_Heartbeat )
  35. nextHeartbeat( message, timeStamp );
  36. else if ( msgType == MsgType_TestRequest )
  37. nextTestRequest( message, timeStamp );
  38. else if ( msgType == MsgType_SequenceReset )
  39. nextSequenceReset( message, timeStamp );
  40. else if ( msgType == MsgType_Logout )
  41. nextLogout( message, timeStamp );
  42. else if ( msgType == MsgType_ResendRequest )
  43. nextResendRequest( message, timeStamp );
  44. else if ( msgType == MsgType_Reject )
  45. nextReject( message, timeStamp );
  46. else
  47. {
  48. if ( !verify( message ) ) return ;
  49. //内含Session::doTargetTooLow() 来处理序列号过小的消息
  50. // Session::doTargetTooHigh() 来处理序列号过大的消息
  51. m_state.incrNextTargetMsgSeqNum();
  52. }
  53. }
  54. ''''''
  55. if( !queued )
  56. nextQueued( timeStamp );
  57. if( isLoggedOn() )
  58. next();
  59. }

经过各种检查后,根据type调用不同的处理方法,然后操作queue进行下次操作。

这里调用的函数太多了,挑一个复杂的看一下。

6.3 nextResendRequest()

当收到 type是ResendRequest 消息时,回调用nextResendRequest() 处理:


  
  1. void Session::nextResendRequest(const Message& resendRequest, const UtcTimeStamp& timeStamp)
  2. {
  3. // ...
  4. // 从缓存拿出需要重传的消息片段(从MessageStore中的消息,如果是FileStore,那么就会从文件中取出)
  5. std::vector < std::string > messages;
  6. m_state.get( beginSeqNo, endSeqNo, messages );
  7. // ...
  8. for ( i = messages.begin(); i != messages.end(); ++i )
  9. {
  10. // 重新计算消息的校验和
  11. // ...
  12. if ( Message::isAdminMsgType( msgType ) )
  13. {
  14. // 跳过管理消息
  15. if ( !begin ) begin = msgSeqNum;
  16. }
  17. else
  18. {
  19. // 在 resend 里会回调 Application::toApp
  20. if ( resend( msg ) )
  21. {
  22. // 有需要跳过的管理消息,则用一条 SeqReset-GapFill 消息替代
  23. if ( begin ) generateSequenceReset( begin, msgSeqNum );
  24. // 发送应用消息
  25. send( msg.toString(messageString) );
  26. m_state.onEvent( "Resending Message: "
  27. + IntConvertor::convert( msgSeqNum ) );
  28. begin = 0;
  29. appMessageJustSent = true;
  30. }
  31. else
  32. { if ( !begin ) begin = msgSeqNum; }
  33. }
  34. current = msgSeqNum + 1;
  35. }
  36. // 结尾还有需要跳过的管理消息,需要用一条 SeqReset-GapFill 消息替代
  37. if ( begin )
  38. {
  39. generateSequenceReset( begin, msgSeqNum + 1 );
  40. }
  41. // 序列号同步。为什么在重传借宿后还需要再发送一个 SeqReset-GapFill 消息?
  42. if ( endSeqNo > msgSeqNum )
  43. {
  44. endSeqNo = EndSeqNo(endSeqNo + 1);
  45. int next = m_state.getNextSenderMsgSeqNum();
  46. if( endSeqNo > next )
  47. endSeqNo = EndSeqNo(next);
  48. if ( appMessageJustSent )
  49. beginSeqNo = msgSeqNum + 1;
  50. generateSequenceReset( beginSeqNo, endSeqNo );
  51. }
  52. resendRequest.getHeader().getField( msgSeqNum );
  53. if( !isTargetTooHigh(msgSeqNum) && !isTargetTooLow(msgSeqNum) )
  54. m_state.incrNextTargetMsgSeqNum();
  55. }

作者修行尚浅,这里只是浅读一下源码,由于使用经验不足,肯定对一些知识的认识不足,以后多加改正。

文章来源: fantianzuo.blog.csdn.net,作者:兔老大RabbitMQ,版权归原作者所有,如需转载,请联系作者。

原文链接:fantianzuo.blog.csdn.net/article/details/126285739

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。