FIX三天日记-quick fix源码
一、概述
1.1 如何阅读?
对于一般人,没必要像对待常用公共组件一样,搞清楚每一个点,我们从使用的角度出发,把我们用到的功能读到即可。
1.2 如何下载 ?
https://github.com/quickfix/quickfix
1.3 大概都有哪些?
源码就在src\C++下,我们先大致浏览一下。
DataDictionary.cpp:解析诸如FIX42.xml的数据字典
 Field.cpp:数据字典中解析预定义的field
 Message.cpp:数据字典中解析处理message节点
 Http.cpp: 实现http引擎的部分
 Socket.cpp:会话层的通信
 Session.cpp: 会话层的东西
 还有一些其他的文件,略去不说。这里还要注意还有几个子文件夹:fix40/,fix41/,fix42/,fix43/,fix44/,fix50/,fix50sp1。这几个文件夹下是具体实现了该版本的一些头文件。
1.4 我会用到哪些?
上篇文章有使用的例子,我们去掉多余部分,拿过来是这样的:
  
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
     
      int main( int argc, char** argv )
     
    
 
   - 
    
     
    
    
     
      {
     
    
 
   - 
    
     
    
    
     
        FIX::Initiator * initiator = 0;
     
    
 
   - 
    
     
    
    
       try
     
    
 
   - 
    
     
    
    
     
        {
     
    
 
   - 
    
     
    
    
         FIX::SessionSettings settings( file );
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
     
          Application application;
     
    
 
   - 
    
     
    
    
         FIX::FileStoreFactory storeFactory( settings );
     
    
 
   - 
    
     
    
    
         FIX::ScreenLogFactory logFactory( settings );
     
    
 
   - 
    
     
    
    
     
          initiator = new FIX::SocketInitiator( application, storeFactory, 
     
    
 
   - 
    
     
    
    
     
                                                settings, logFactory );
     
    
 
   - 
    
     
    
    
     
          initiator->start();
     
    
 
   - 
    
     
    
    
     
          application.run();
     
    
 
   - 
    
     
    
    
     
          initiator->stop();
     
    
 
   - 
    
     
    
    
         delete initiator;
     
    
 
   - 
    
     
    
    
     
      '''
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
     
        }
     
    
 
   - 
    
     
    
    
       catch ( std::exception & e )
     
    
 
   - 
    
     
    
    
     
        {
     
    
 
   - 
    
     
    
    
     
      '''
     
    
 
   - 
    
     
    
    
     
        }
     
    
 
   - 
    
     
    
    
     
      }
     
    
 
  
 
 
请记住每一行代码,接下来,本文基本是每章讲解本代码中的一行。
二、SessionSettings
就是这一行:FIX::SessionSettings settings( file );
2.1 数据字典
Quickfix中进行数据字典的载入,解析本质是对几个xml文件的解析,是采用pugixml parser,官方网站:pugixml.org - Home。正如官网介绍的那样:
Light-weight, simple and fast XML parser for C++ with XPath support
然后Quickfix中在之上进行了一层自己的封装,形成PUGIXML_DOMAttributes类,PUGIXML_DOMNode类,PUGIXML_DOMDocument类。在头文件”PUGIXML_DOMDocument.h”中进行了定义,如下:
  
   - 
    
     
    
    
      class PUGIXML_DOMAttributes : public DOMAttributes
     
    
 
   - 
    
     
    
    
     
        {
     
    
 
   - 
    
     
    
    
       public:
     
    
 
   - 
    
     
    
    
         PUGIXML_DOMAttributes( pugi::xml_node pNode )
     
    
 
   - 
    
     
    
    
     
          : m_pNode(pNode) {}
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
         bool get( const std::string&, std::string& );
     
    
 
   - 
    
     
    
    
         DOMAttributes::map toMap();
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
       private:
     
    
 
   - 
    
     
    
    
     
          pugi::xml_node m_pNode;
     
    
 
   - 
    
     
    
    
     
        };
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
       /// XML node as represented by pugixml.
     
    
 
   - 
    
     
    
    
       class PUGIXML_DOMNode : public DOMNode
     
    
 
   - 
    
     
    
    
     
        {
     
    
 
   - 
    
     
    
    
       public:
     
    
 
   - 
    
     
    
    
         PUGIXML_DOMNode( pugi::xml_node pNode )
     
    
 
   - 
    
     
    
    
     
          : m_pNode(pNode) {}
     
    
 
   - 
    
     
    
    
     
          ~PUGIXML_DOMNode() {}
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
         DOMNodePtr getFirstChildNode();
     
    
 
   - 
    
     
    
    
         DOMNodePtr getNextSiblingNode();
     
    
 
   - 
    
     
    
    
         DOMAttributesPtr getAttributes();
     
    
 
   - 
    
     
    
    
         std::string getName();
     
    
 
   - 
    
     
    
    
         std::string getText();
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
       private:
     
    
 
   - 
    
     
    
    
     
          pugi::xml_node m_pNode;
     
    
 
   - 
    
     
    
    
     
        };
     
    
 
   - 
    
     
    
    
        /// XML document as represented by pugixml.
     
    
 
   - 
    
     
    
    
       class PUGIXML_DOMDocument : public DOMDocument
     
    
 
   - 
    
     
    
    
     
        {
     
    
 
   - 
    
     
    
    
       public:
     
    
 
   - 
    
     
    
    
         PUGIXML_DOMDocument() throw( ConfigError );
     
    
 
   - 
    
     
    
    
     
          ~PUGIXML_DOMDocument();
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
         bool load( std::istream& );
     
    
 
   - 
    
     
    
    
         bool load( const std::string& );
     
    
 
   - 
    
     
    
    
         bool xml( std::ostream& );
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
         DOMNodePtr getNode( const std::string& );
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
       private:
     
    
 
   - 
    
     
    
    
     
          pugi::xml_document m_pDoc;
     
    
 
   - 
    
     
    
    
     
        };
     
    
 
   - 
    
     
    
    
     
      }
     
    
 
  
 
 
其中大多数函数不需要特别关心,我们只需要重点关心PUGIXML_DOMDocument类中的load()函数,这也是最重要+最复杂的函数。
  
   - 
    
     
    
    
     
      bool PUGIXML_DOMDocument::load( std::istream& stream )
     
    
 
   - 
    
     
    
    
     
          {
     
    
 
   - 
    
     
    
    
           try 
     
    
 
   - 
    
     
    
    
     
            { 
     
    
 
   - 
    
     
    
    
             return m_pDoc.load(stream);
     
    
 
   - 
    
     
    
    
     
            } 
     
    
 
   - 
    
     
    
    
           catch( ... ) { return false; }
     
    
 
   - 
    
     
    
    
     
          }
     
    
 
   - 
    
     
    
    
       
     
    
 
   - 
    
     
    
    
         bool PUGIXML_DOMDocument::load( const std::string& url )
     
    
 
   - 
    
     
    
    
     
          {
     
    
 
   - 
    
     
    
    
           try 
     
    
 
   - 
    
     
    
    
     
            { 
     
    
 
   - 
    
     
    
    
             return m_pDoc.load_file(url.c_str());
     
    
 
   - 
    
     
    
    
     
            } 
     
    
 
   - 
    
     
    
    
           catch( ... ) { return false; }
     
    
 
   - 
    
     
    
    
     
          }
     
    
 
  
 
 
这个函数就是对给定一个xml路径然后装载后返回一个pugi::xml_document的对象。
2.2 数据字典解析
上面的类实现了诸如FIX44.xml的载入处理,数据字典中定义了很多结构节点,比如fields,messages,groups等,DataDictionary.cpp是真正对这些xml文件进行解析的源文件。DataDictionary.h中部分源代码如下:
  
   - 
    
     
    
    
     
      class DataDictionary
     
    
 
   - 
    
     
    
    
     
      {
     
    
 
   - 
    
     
    
    
       typedef std::set < int > MsgFields;
     
    
 
   - 
    
     
    
    
       typedef std::map < std::string, MsgFields > MsgTypeToField;
     
    
 
   - 
    
     
    
    
       typedef std::set < std::string > MsgTypes;
     
    
 
   - 
    
     
    
    
       typedef std::set < int > Fields;
     
    
 
   - 
    
     
    
    
       typedef std::map < int, bool > NonBodyFields;
     
    
 
   - 
    
     
    
    
       typedef std::vector< int > OrderedFields;
     
    
 
   - 
    
     
    
    
       typedef message_order OrderedFieldsArray;
     
    
 
   - 
    
     
    
    
       typedef std::map < int, TYPE::Type > FieldTypes;
     
    
 
   - 
    
     
    
    
       typedef std::set < std::string > Values;
     
    
 
   - 
    
     
    
    
       typedef std::map < int, Values > FieldToValue;
     
    
 
   - 
    
     
    
    
       typedef std::map < int, std::string > FieldToName;
     
    
 
   - 
    
     
    
    
       typedef std::map < std::string, int > NameToField;
     
    
 
   - 
    
     
    
    
       typedef std::map < std::pair < int, std::string > , std::string  > ValueToName;
     
    
 
   - 
    
     
    
    
       // while FieldToGroup structure seems to be overcomplicated
     
    
 
   - 
    
     
    
    
       // in reality it yields a lot of performance because:
     
    
 
   - 
    
     
    
    
       // 1) avoids memory copying;
     
    
 
   - 
    
     
    
    
       // 2) first lookup is done by comparing integers and not string objects
     
    
 
   - 
    
     
    
    
       // TODO: use hash_map with good hashing algorithm
     
    
 
   - 
    
     
    
    
       typedef std::map < std::string, std::pair < int, DataDictionary* > > FieldPresenceMap;
     
    
 
   - 
    
     
    
    
       typedef std::map < int, FieldPresenceMap > FieldToGroup;
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
     
      public:
     
    
 
   - 
    
     
    
    
       DataDictionary();
     
    
 
   - 
    
     
    
    
       DataDictionary( const DataDictionary& copy );
     
    
 
   - 
    
     
    
    
       DataDictionary( std::istream& stream ) throw( ConfigError );
     
    
 
   - 
    
     
    
    
       DataDictionary( const std::string& url ) throw( ConfigError );
     
    
 
   - 
    
     
    
    
       virtual ~DataDictionary();
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
       void readFromURL( const std::string& url ) throw( ConfigError );
     
    
 
   - 
    
     
    
    
       void readFromDocument( DOMDocumentPtr pDoc ) throw( ConfigError );
     
    
 
   - 
    
     
    
    
       void readFromStream( std::istream& stream ) throw( ConfigError );
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
     
      ......
     
    
 
   - 
    
     
    
    
     
      };
     
    
 
   - 
    
     
    
    
     
      ....
     
    
 
  
 
 
可以看到DataDictionary类中定义了很多的std::map和std::vector,这些容器都是用来存储从FIX4X.xml文件中解析来的内容,一些映射,但是是否过于繁琐,我没有深究。
比如:
typedef std::map < int, std::string > FieldToName;
 
表示存储field和实际的字段名的映射,比如8对应BeginString;
typedef std::map < int, Values > FieldToValue;
 
表示枚举当中的int值跟实际的字段名的映射,比如:
  
   - 
    
     
    
    
     
      <field number='13' name='CommType' type='CHAR'>
     
    
 
   - 
    
     
    
    
     
         <value enum='1' description='PER_UNIT' />
     
    
 
   - 
    
     
    
    
     
         <value enum='2' description='PERCENTAGE' />
     
    
 
   - 
    
     
    
    
     
         <value enum='3' description='ABSOLUTE' />
     
    
 
   - 
    
     
    
    
     
         <value enum='4' description='4' />
     
    
 
   - 
    
     
    
    
     
         <value enum='5' description='5' />
     
    
 
   - 
    
     
    
    
     
         <value enum='6' description='POINTS_PER_BOND_OR_CONTRACT_SUPPLY_CONTRACTMULTIPLIER' />
     
    
 
   - 
    
     
    
    
     
        </field>
     
    
 
  
 
3代表ABSOLUTE;1代表PER_UNIT。
另外需要注意的成员函数readFrom*()系列,底层就是上一章中的类,进行xml的载入。
  
   - 
    
     
    
    
     
      void DataDictionary::readFromURL( const std::string& url )
     
    
 
   - 
    
     
    
    
       throw( ConfigError )
     
    
 
   - 
    
     
    
    
     
        {
     
    
 
   - 
    
     
    
    
     
          DOMDocumentPtr pDoc = DOMDocumentPtr(new PUGIXML_DOMDocument());
     
    
 
   - 
    
     
    
    
       
     
    
 
   - 
    
     
    
    
         if(!pDoc->load(url))
     
    
 
   - 
    
     
    
    
     
          ¦ throw ConfigError(url + ": Could not parse data dictionary file");
     
    
 
   - 
    
     
    
    
       
     
    
 
   - 
    
     
    
    
         try 
     
    
 
   - 
    
     
    
    
     
          {
     
    
 
   - 
    
     
    
    
     
          ¦ readFromDocument( pDoc );
     
    
 
   - 
    
     
    
    
     
          }
     
    
 
   - 
    
     
    
    
         catch( ConfigError& e ) 
     
    
 
   - 
    
     
    
    
     
          {
     
    
 
   - 
    
     
    
    
     
          ¦ throw ConfigError( url + ": " + e.what() );
     
    
 
   - 
    
     
    
    
     
          }
     
    
 
   - 
    
     
    
    
     
        }
     
    
 
   - 
    
     
    
    
       
     
    
 
   - 
    
     
    
    
       void DataDictionary::readFromStream( std::istream& stream )
     
    
 
   - 
    
     
    
    
       throw( ConfigError )
     
    
 
   - 
    
     
    
    
     
        {
     
    
 
   - 
    
     
    
    
     
      >*  DOMDocumentPtr pDoc = DOMDocumentPtr(new PUGIXML_DOMDocument());
     
    
 
   - 
    
     
    
    
       
     
    
 
   - 
    
     
    
    
         if(!pDoc->load(stream))
     
    
 
   - 
    
     
    
    
     
          ¦ throw ConfigError("Could not parse data dictionary stream");
     
    
 
   - 
    
     
    
    
       
     
    
 
   - 
    
     
    
    
         readFromDocument( pDoc );
     
    
 
   - 
    
     
    
    
     
        }
     
    
 
   - 
    
     
    
    
       
     
    
 
   - 
    
     
    
    
     
      >*void DataDictionary::readFromDocument( DOMDocumentPtr pDoc )
     
    
 
   - 
    
     
    
    
       throw( ConfigError )
     
    
 
   - 
    
     
    
    
     
        {
     
    
 
   - 
    
     
    
    
         // VERSION
     
    
 
   - 
    
     
    
    
     
          DOMNodePtr pFixNode = pDoc->getNode("/fix");
     
    
 
   - 
    
     
    
    
         if(!pFixNode.get())
     
    
 
   - 
    
     
    
    
     
      ...
     
    
 
   - 
    
     
    
    
     
      }
     
    
 
  
 
 
到这里,数据字典的解析就完成了。简单的理解就是,读入xml文件,然后针对xml文件里的内容,把内容做成映射用map和vector存储。
2.3 数据字典存储
SessionSettings
  
   - 
    
     
    
    
     
      /// Container for setting dictionaries mapped to sessions.
     
    
 
   - 
    
     
    
    
     
      class SessionSettings
     
    
 
   - 
    
     
    
    
     
      {
     
    
 
   - 
    
     
    
    
     
      public:
     
    
 
   - 
    
     
    
    
       SessionSettings() { m_resolveEnvVars = false; }
     
    
 
   - 
    
     
    
    
       SessionSettings( std::istream& stream, bool resolveEnvVars = false ) EXCEPT ( ConfigError );
     
    
 
   - 
    
     
    
    
       SessionSettings( const std::string& file, bool resolveEnvVars = false ) EXCEPT ( ConfigError );
     
    
 
   - 
    
     
    
    
     
      ''''''
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
       typedef std::map < SessionID, Dictionary > Dictionaries;
     
    
 
   - 
    
     
    
    
     
        std::set < SessionID > getSessions() const;
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
     
      private:
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
     
        Dictionaries m_settings;
     
    
 
   - 
    
     
    
    
     
        Dictionary m_defaults;
     
    
 
   - 
    
     
    
    
     
      '''
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
       friend std::istream& operator>>( std::istream&, SessionSettings& ) EXCEPT ( ConfigError );
     
    
 
   - 
    
     
    
    
       friend std::ostream& operator<<( std::ostream&, const SessionSettings& );
     
    
 
   - 
    
     
    
    
     
      };
     
    
 
  
 
 
是通过友元函数 operator >> 从任意的流中读取配置,通过一个sessonid的set和一个sessionid->dictionary的map,管理每个段。
三、Application
3.1 Application
若是须要使用QuickFIX开发FIX应用,则须要实现FIX::Application接口,并重载不一样FIX协议版本的MessageCracker::OnMessage接口,如FIX42::MessageCracker。
  
   - 
    
     
    
    
     
      class Application
     
    
 
   - 
    
     
    
    
     
      {
     
    
 
   - 
    
     
    
    
     
      public:
     
    
 
   - 
    
     
    
    
     
        virtual ~Application() {};
     
    
 
   - 
    
     
    
    
     
        /// Notification of a session begin created
     
    
 
   - 
    
     
    
    
     
        virtual void onCreate( const SessionID& ) = 0;
     
    
 
   - 
    
     
    
    
     
       
     
    
 
   - 
    
     
    
    
     
        /// Notification of a session successfully logging on
     
    
 
   - 
    
     
    
    
     
        virtual void onLogon( const SessionID& ) = 0;
     
    
 
   - 
    
     
    
    
     
       
     
    
 
   - 
    
     
    
    
     
        /// Notification of a session logging off or disconnecting
     
    
 
   - 
    
     
    
    
     
        virtual void onLogout( const SessionID& ) = 0;
     
    
 
   - 
    
     
    
    
     
       
     
    
 
   - 
    
     
    
    
     
        /// Notification of admin message being sent to target
     
    
 
   - 
    
     
    
    
     
        virtual void toAdmin( Message&, const SessionID& ) = 0;
     
    
 
   - 
    
     
    
    
     
       
     
    
 
   - 
    
     
    
    
     
        /// Notification of app message being sent to target
     
    
 
   - 
    
     
    
    
     
        virtual void toApp( Message&, const SessionID& )
     
    
 
   - 
    
     
    
    
     
        EXCEPT ( DoNotSend ) = 0;
     
    
 
   - 
    
     
    
    
     
       
     
    
 
   - 
    
     
    
    
     
        /// Notification of admin message being received from target
     
    
 
   - 
    
     
    
    
     
        virtual void fromAdmin( const Message&, const SessionID& )
     
    
 
   - 
    
     
    
    
     
        EXCEPT ( FieldNotFound, IncorrectDataFormat, IncorrectTagValue, RejectLogon ) = 0;
     
    
 
   - 
    
     
    
    
     
       
     
    
 
   - 
    
     
    
    
     
        /// Notification of app message being received from target
     
    
 
   - 
    
     
    
    
     
        virtual void fromApp( const Message&, const SessionID& )
     
    
 
   - 
    
     
    
    
     
        EXCEPT ( FieldNotFound, IncorrectDataFormat, IncorrectTagValue, UnsupportedMessageType ) = 0;
     
    
 
   - 
    
     
    
    
     
      };
     
    
 
  
 
 
  onCreate:当Fix Session创建时调用。
 onLogon:当Fix Session登陆成功时调用。
 onLogout:当Fix Session退出时调用。
 fromAdmin:当收到一个Admin类型消息时调用。
 fromApp:当收到一个不属于Admin 类型消息时调用。
 toAdmin:当发送一个admin类型消息调用。
 toApp:当发送一个非admin(业务类型)消息调用。
admin一般是服务提供方,app是客户端
 3.2 MessageCracker
除了实现FIX::Application接口,还需要重新实现FIX::MessageCracker从具体的FIX协议版本实现继承而来的onMessage方法,crack接口就可以根据message类型匹配到你实现的具体onMessage接口上。
  
   - 
    
     
    
    
       void crack( const Message& message,
     
    
 
   - 
    
     
    
    
                   const SessionID& sessionID )
     
    
 
   - 
    
     
    
    
     
        {
     
    
 
   - 
    
     
    
    
         const FIX::BeginString& beginString = 
     
    
 
   - 
    
     
    
    
           FIELD_GET_REF( message.getHeader(), BeginString );
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
         crack( message, sessionID, beginString );
     
    
 
   - 
    
     
    
    
     
        }
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
       void crack( const Message& message,
     
    
 
   - 
    
     
    
    
                   const SessionID& sessionID,
     
    
 
   - 
    
     
    
    
                   const BeginString& beginString )
     
    
 
   - 
    
     
    
    
     
        {
     
    
 
   - 
    
     
    
    
         if ( beginString == BeginString_FIX40 )
     
    
 
   - 
    
     
    
    
     
            ((FIX40::MessageCracker&)(*this)).crack((const FIX40::Message&) message, sessionID);
     
    
 
   - 
    
     
    
    
         else if ( beginString == BeginString_FIX41 )
     
    
 
   - 
    
     
    
    
     
            ((FIX41::MessageCracker&)(*this)).crack((const FIX41::Message&) message, sessionID);
     
    
 
   - 
    
     
    
    
         else if ( beginString == BeginString_FIX42 )
     
    
 
   - 
    
     
    
    
     
            ((FIX42::MessageCracker&)(*this)).crack((const FIX42::Message&) message, sessionID);
     
    
 
   - 
    
     
    
    
         else if ( beginString == BeginString_FIX43 )
     
    
 
   - 
    
     
    
    
     
            ((FIX43::MessageCracker&)(*this)).crack((const FIX43::Message&) message, sessionID);
     
    
 
   - 
    
     
    
    
         else if ( beginString == BeginString_FIX44 )
     
    
 
   - 
    
     
    
    
     
            ((FIX44::MessageCracker&)(*this)).crack((const FIX44::Message&) message, sessionID);
     
    
 
   - 
    
     
    
    
         else if ( beginString == BeginString_FIXT11 )
     
    
 
   - 
    
     
    
    
     
          {
     
    
 
   - 
    
     
    
    
           if( message.isAdmin() )
     
    
 
   - 
    
     
    
    
     
            {
     
    
 
   - 
    
     
    
    
     
              ((FIXT11::MessageCracker&)(*this)).crack((const FIXT11::Message&) message, sessionID);
     
    
 
   - 
    
     
    
    
     
            }
     
    
 
   - 
    
     
    
    
           else
     
    
 
   - 
    
     
    
    
     
            {
     
    
 
   - 
    
     
    
    
     
      '''
     
    
 
   - 
    
     
    
    
     
            }
     
    
 
   - 
    
     
    
    
     
          }
     
    
 
   - 
    
     
    
    
     
        }
     
    
 
  
 
 
四、*Factory
就是这两行:
FIX::FileStoreFactory storeFactory( settings );
FIX::ScreenLogFactory logFactory( settings );
逻辑比较简单,就是读了上文介绍的settings,然后存下来,存储结构如下:
std::string m_path;
SessionSettings m_settings;
五、initiator/Acceptor
也就是这一行 initiator = new FIX::SocketInitiator( application, storeFactory, settings, logFactory );
这俩大概差不多,先看一个。
主要代码如下:
  
   - 
    
     
    
    
     
      /**
     
    
 
   - 
    
     
    
    
     
       * Base for classes which act as an acceptor for incoming connections.
     
    
 
   - 
    
     
    
    
     
       *
     
    
 
   - 
    
     
    
    
     
       * Most users will not need to implement one of these. The default
     
    
 
   - 
    
     
    
    
     
       * SocketAcceptor implementation will be used in most cases.
     
    
 
   - 
    
     
    
    
     
       */
     
    
 
   - 
    
     
    
    
     
      class Acceptor
     
    
 
   - 
    
     
    
    
     
      {
     
    
 
   - 
    
     
    
    
     
      public:
     
    
 
   - 
    
     
    
    
     
      ''''''
     
    
 
   - 
    
     
    
    
       Acceptor( Application&, MessageStoreFactory&,
     
    
 
   - 
    
     
    
    
                 const SessionSettings&, LogFactory& ) EXCEPT ( ConfigError );
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
       virtual ~Acceptor();
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
     
      ''''''
     
    
 
   - 
    
     
    
    
       /// Poll the acceptor
     
    
 
   - 
    
     
    
    
       bool poll( double timeout = 0.0 ) EXCEPT ( ConfigError, RuntimeError );
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
       /// Stop acceptor.
     
    
 
   - 
    
     
    
    
       void stop( bool force = false );
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
       /// Check to see if any sessions are currently logged on
     
    
 
   - 
    
     
    
    
       bool isLoggedOn();
     
    
 
   - 
    
     
    
    
       Session* getSession( const std::string& msg, Responder& );
     
    
 
   - 
    
     
    
    
       const std::set<SessionID>& getSessions() const { return m_sessionIDs; }
     
    
 
   - 
    
     
    
    
       Session* getSession( const SessionID& sessionID ) const;
     
    
 
   - 
    
     
    
    
       const Dictionary* const getSessionSettings( const SessionID& sessionID ) const;
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
       bool has( const SessionID& id )
     
    
 
   - 
    
     
    
    
     
        { return m_sessions.find( id ) != m_sessions.end(); }
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
       bool isStopped() { return m_stop; }
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
       Application& getApplication() { return m_application; }
     
    
 
   - 
    
     
    
    
       MessageStoreFactory& getMessageStoreFactory()
     
    
 
   - 
    
     
    
    
     
        { return m_messageStoreFactory; }
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
     
      private:
     
    
 
   - 
    
     
    
    
     
      ''''''
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
       static THREAD_PROC startThread( void* p );
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
       typedef std::set < SessionID > SessionIDs;
     
    
 
   - 
    
     
    
    
       typedef std::map < SessionID, Session* > Sessions;
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
     
        thread_id m_threadid;
     
    
 
   - 
    
     
    
    
     
        Sessions m_sessions;
     
    
 
   - 
    
     
    
    
     
        SessionIDs m_sessionIDs;
     
    
 
   - 
    
     
    
    
     
        Application& m_application;
     
    
 
   - 
    
     
    
    
     
        MessageStoreFactory& m_messageStoreFactory;
     
    
 
   - 
    
     
    
    
     
      protected:
     
    
 
   - 
    
     
    
    
     
        SessionSettings m_settings;
     
    
 
   - 
    
     
    
    
     
      private:
     
    
 
   - 
    
     
    
    
     
        LogFactory* m_pLogFactory;
     
    
 
   - 
    
     
    
    
     
        Log* m_pLog;
     
    
 
   - 
    
     
    
    
     
        NullLog m_nullLog;
     
    
 
   - 
    
     
    
    
       bool m_firstPoll;
     
    
 
   - 
    
     
    
    
       bool m_stop;
     
    
 
   - 
    
     
    
    
     
      };
     
    
 
  
 
 
基本包含了之前介绍的大部分类,如
Session相关的(SessionSettings/set<SessionID>/map<SessionID, Session*>)、
Application(用于接收并处理消息的)、LogFactory(写日志的对象)
5.1 init
功能就是把配置的每一个session初始化,很简单。
  
   - 
    
     
    
    
     
      void Acceptor::initialize() EXCEPT ( ConfigError )
     
    
 
   - 
    
     
    
    
     
      {
     
    
 
   - 
    
     
    
    
     
        std::set < SessionID > sessions = m_settings.getSessions();
     
    
 
   - 
    
     
    
    
     
        std::set < SessionID > ::iterator i;
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
       if ( !sessions.size() )
     
    
 
   - 
    
     
    
    
         throw ConfigError( "No sessions defined" );
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
       SessionFactory factory( m_application, m_messageStoreFactory,
     
    
 
   - 
    
     
    
    
     
       m_pLogFactory );
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
       for ( i = sessions.begin(); i != sessions.end(); ++i )
     
    
 
   - 
    
     
    
    
     
        {
     
    
 
   - 
    
     
    
    
         if ( m_settings.get( *i ).getString( CONNECTION_TYPE ) == "acceptor" )
     
    
 
   - 
    
     
    
    
     
          {
     
    
 
   - 
    
     
    
    
     
            m_sessionIDs.insert( *i );
     
    
 
   - 
    
     
    
    
     
            m_sessions[ *i ] = factory.create( *i, m_settings.get( *i ) );
     
    
 
   - 
    
     
    
    
     
          }
     
    
 
   - 
    
     
    
    
     
        }
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
       if ( !m_sessions.size() )
     
    
 
   - 
    
     
    
    
         throw ConfigError( "No sessions defined for acceptor" );
     
    
 
   - 
    
     
    
    
     
      }
     
    
 
  
 
 
5.2 start
这一行:Acceptor/initiator->start();
- 调用 
SocketAcceptor::onInitialize()创建 socket 句柄,进行监听端口。 - 启动线程,调用 
SocketAcceptor::onStart(),检测对端的连接 
  
   - 
    
     
    
    
     
      void Acceptor::start() EXCEPT ( ConfigError, RuntimeError )
     
    
 
   - 
    
     
    
    
     
      {
     
    
 
   - 
    
     
    
    
     
          m_stop = false;
     
    
 
   - 
    
     
    
    
         onConfigure( m_settings );
     
    
 
   - 
    
     
    
    
         onInitialize( m_settings );
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
     
          HttpServer::startGlobal( m_settings );
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
         if( !thread_spawn( &startThread, this, m_threadid ) )
     
    
 
   - 
    
     
    
    
             throw RuntimeError("Unable to spawn thread");
     
    
 
   - 
    
     
    
    
     
      }
     
    
 
  
 
其他的操作大同小异,可以自己阅读
5.3 SocketAcceptor::onInitialize
主要功能就是对每个session设置监听
  
   - 
    
     
    
    
     
      void SocketAcceptor::onInitialize(const SessionSettings& s)
     
    
 
   - 
    
     
    
    
         EXCEPT ( RuntimeError )
     
    
 
   - 
    
     
    
    
     
      {
     
    
 
   - 
    
     
    
    
         short port = 0;
     
    
 
   - 
    
     
    
    
         try
     
    
 
   - 
    
     
    
    
     
          {
     
    
 
   - 
    
     
    
    
     
              m_pServer = new SocketServer(1);
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
     
              std::set<SessionID> sessions = s.getSessions();
     
    
 
   - 
    
     
    
    
     
              std::set<SessionID>::iterator i = sessions.begin();
     
    
 
   - 
    
     
    
    
             for( ; i != sessions.end(); ++i )
     
    
 
   - 
    
     
    
    
     
              {
     
    
 
   - 
    
     
    
    
                 const Dictionary& settings = s.get( *i );
     
    
 
   - 
    
     
    
    
     
                  port = (short)settings.getInt( SOCKET_ACCEPT_PORT );
     
    
 
   - 
    
     
    
    
     
      ''''''
     
    
 
   - 
    
     
    
    
                 // 管理监听端口与 SeesionID 的对应关系
     
    
 
   - 
    
     
    
    
     
                  m_portToSessions[port].insert(*i);
     
    
 
   - 
    
     
    
    
                 // 为每个监听的端口创建 Socket 句柄: socket_handle
     
    
 
   - 
    
     
    
    
     
                  m_pServer->add( port, reuseAddress, noDelay, sendBufSize, rcvBufSize );
     
    
 
   - 
    
     
    
    
     
              }
     
    
 
   - 
    
     
    
    
     
          }
     
    
 
   - 
    
     
    
    
         catch( SocketException& e )
     
    
 
   - 
    
     
    
    
     
          {
     
    
 
   - 
    
     
    
    
     
      ''''''
     
    
 
   - 
    
     
    
    
     
          }
     
    
 
   - 
    
     
    
    
     
      }
     
    
 
  
 
 
5.4
5.2中的第二步调用
  
   - 
    
     
    
    
     
      THREAD_PROC Acceptor::startThread( void* p )
     
    
 
   - 
    
     
    
    
     
      {
     
    
 
   - 
    
     
    
    
     
        Acceptor * pAcceptor = static_cast < Acceptor* > ( p );
     
    
 
   - 
    
     
    
    
     
        pAcceptor->onStart();
     
    
 
   - 
    
     
    
    
       return 0;
     
    
 
   - 
    
     
    
    
     
      }
     
    
 
  
 
六、session
回顾所有我们浏览的代码,唯独没有介绍session,最后来看一下。
6.1 session创建
用factory(初始化心跳、session)
  
   - 
    
     
    
    
     
      Session* SessionFactory::create( const SessionID& sessionID,
     
    
 
   - 
    
     
    
    
                                      const Dictionary& settings ) EXCEPT ( ConfigError )
     
    
 
   - 
    
     
    
    
     
      {
     
    
 
   - 
    
     
    
    
     
          std::string connectionType = settings.getString( CONNECTION_TYPE );
     
    
 
   - 
    
     
    
    
         if ( connectionType != "acceptor" && connectionType != "initiator" )
     
    
 
   - 
    
     
    
    
             throw ConfigError( "Invalid ConnectionType" );
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
         if( connectionType == "acceptor" && settings.has(SESSION_QUALIFIER) )
     
    
 
   - 
    
     
    
    
             throw ConfigError( "SessionQualifier cannot be used with acceptor." );
     
    
 
   - 
    
     
    
    
         // 初始化心跳
     
    
 
   - 
    
     
    
    
         HeartBtInt heartBtInt( 0 );
     
    
 
   - 
    
     
    
    
         if ( connectionType == "initiator" )
     
    
 
   - 
    
     
    
    
     
          {
     
    
 
   - 
    
     
    
    
     
              heartBtInt = HeartBtInt( settings.getInt( HEARTBTINT ) );
     
    
 
   - 
    
     
    
    
             if ( heartBtInt <= 0 ) throw ConfigError( "Heartbeat must be greater than zero" );
     
    
 
   - 
    
     
    
    
     
          }
     
    
 
   - 
    
     
    
    
         // 创建 Session 对象
     
    
 
   - 
    
     
    
    
     
          SmartPtr<Session> pSession;
     
    
 
   - 
    
     
    
    
     
          pSession.reset( new Session( m_application, m_messageStoreFactory,
     
    
 
   - 
    
     
    
    
     
                                      sessionID, dataDictionaryProvider, sessionTimeRange,
     
    
 
   - 
    
     
    
    
     
                                      heartBtInt, m_pLogFactory ) );
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
         return pSession.release();
     
    
 
   - 
    
     
    
    
     
      }
     
    
 
  
 
 
其中session对象内属性太多,挑一些重要的看:
Application(会话)、
SessionID(标识唯一session)、
m_sessionTime/m_logonTime(主要用于之前讲的24小时重新连接,对应配置)、
m_senderDefaultApplVerID/m_targetDefaultApplVerID(发送端/接收端默 Fix 协议版本号)、
m_state(session状态)、
send()(发送消息函数)、
next()(处理收到的消息,比较重要)
6.2 next()
精简过的代码如下
  
   - 
    
     
    
    
     
      void Session::next( const Message& message, const UtcTimeStamp& timeStamp, bool queued )
     
    
 
   - 
    
     
    
    
     
      {
     
    
 
   - 
    
     
    
    
       const Header& header = message.getHeader();
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
       try
     
    
 
   - 
    
     
    
    
     
        {
     
    
 
   - 
    
     
    
    
         //检查时间
     
    
 
   - 
    
     
    
    
         if ( !checkSessionTime(timeStamp) )
     
    
 
   - 
    
     
    
    
     
            { reset(); return; }
     
    
 
   - 
    
     
    
    
         //获取类型,下面根据类型分处理方法
     
    
 
   - 
    
     
    
    
         const MsgType& msgType = FIELD_GET_REF( header, MsgType );
     
    
 
   - 
    
     
    
    
         //校验时间
     
    
 
   - 
    
     
    
    
         const BeginString& beginString = FIELD_GET_REF( header, BeginString );
     
    
 
   - 
    
     
    
    
         // make sure these fields are present
     
    
 
   - 
    
     
    
    
         FIELD_THROW_IF_NOT_FOUND( header, SenderCompID );
     
    
 
   - 
    
     
    
    
         FIELD_THROW_IF_NOT_FOUND( header, TargetCompID );
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
         if ( beginString != m_sessionID.getBeginString() )
     
    
 
   - 
    
     
    
    
           throw UnsupportedVersion();
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
         const DataDictionary& sessionDataDictionary = 
     
    
 
   - 
    
     
    
    
     
              m_dataDictionaryProvider.getSessionDataDictionary(m_sessionID.getBeginString());
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
         if( m_sessionID.isFIXT() && message.isApp() )
     
    
 
   - 
    
     
    
    
     
          {
     
    
 
   - 
    
     
    
    
     
            ApplVerID applVerID = m_targetDefaultApplVerID;
     
    
 
   - 
    
     
    
    
     
            header.getFieldIfSet(applVerID);
     
    
 
   - 
    
     
    
    
           const DataDictionary& applicationDataDictionary = 
     
    
 
   - 
    
     
    
    
     
              m_dataDictionaryProvider.getApplicationDataDictionary(applVerID);
     
    
 
   - 
    
     
    
    
     
            DataDictionary::validate( message, &sessionDataDictionary, &applicationDataDictionary );
     
    
 
   - 
    
     
    
    
     
          }
     
    
 
   - 
    
     
    
    
         else
     
    
 
   - 
    
     
    
    
     
          {
     
    
 
   - 
    
     
    
    
     
            sessionDataDictionary.validate( message );
     
    
 
   - 
    
     
    
    
     
          }
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
         if ( msgType == MsgType_Logon )
     
    
 
   - 
    
     
    
    
           nextLogon( message, timeStamp );
     
    
 
   - 
    
     
    
    
         else if ( msgType == MsgType_Heartbeat )
     
    
 
   - 
    
     
    
    
           nextHeartbeat( message, timeStamp );
     
    
 
   - 
    
     
    
    
         else if ( msgType == MsgType_TestRequest )
     
    
 
   - 
    
     
    
    
           nextTestRequest( message, timeStamp );
     
    
 
   - 
    
     
    
    
         else if ( msgType == MsgType_SequenceReset )
     
    
 
   - 
    
     
    
    
           nextSequenceReset( message, timeStamp );
     
    
 
   - 
    
     
    
    
         else if ( msgType == MsgType_Logout )
     
    
 
   - 
    
     
    
    
           nextLogout( message, timeStamp );
     
    
 
   - 
    
     
    
    
         else if ( msgType == MsgType_ResendRequest )
     
    
 
   - 
    
     
    
    
           nextResendRequest( message, timeStamp );
     
    
 
   - 
    
     
    
    
         else if ( msgType == MsgType_Reject )
     
    
 
   - 
    
     
    
    
           nextReject( message, timeStamp );
     
    
 
   - 
    
     
    
    
         else
     
    
 
   - 
    
     
    
    
     
          {
     
    
 
   - 
    
     
    
    
           if ( !verify( message ) ) return ;
     
    
 
   - 
    
     
    
    
           //内含Session::doTargetTooLow() 来处理序列号过小的消息
     
    
 
   - 
    
     
    
    
           // Session::doTargetTooHigh() 来处理序列号过大的消息
     
    
 
   - 
    
     
    
    
     
            m_state.incrNextTargetMsgSeqNum();
     
    
 
   - 
    
     
    
    
     
          }
     
    
 
   - 
    
     
    
    
     
        }
     
    
 
   - 
    
     
    
    
       ''''''
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
       if( !queued )
     
    
 
   - 
    
     
    
    
         nextQueued( timeStamp );
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
       if( isLoggedOn() )
     
    
 
   - 
    
     
    
    
         next();
     
    
 
   - 
    
     
    
    
     
      }
     
    
 
  
 
 
经过各种检查后,根据type调用不同的处理方法,然后操作queue进行下次操作。
这里调用的函数太多了,挑一个复杂的看一下。
6.3 nextResendRequest()
当收到 type是ResendRequest 消息时,回调用nextResendRequest() 处理:
  
   - 
    
     
    
    
     
      void Session::nextResendRequest(const Message& resendRequest, const UtcTimeStamp& timeStamp)
     
    
 
   - 
    
     
    
    
     
      {
     
    
 
   - 
    
     
    
    
         // ...
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
         // 从缓存拿出需要重传的消息片段(从MessageStore中的消息,如果是FileStore,那么就会从文件中取出)
     
    
 
   - 
    
     
    
    
     
          std::vector < std::string > messages;
     
    
 
   - 
    
     
    
    
     
          m_state.get( beginSeqNo, endSeqNo, messages );
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
         // ...
     
    
 
   - 
    
     
    
    
         for ( i = messages.begin(); i != messages.end(); ++i )
     
    
 
   - 
    
     
    
    
     
          {
     
    
 
   - 
    
     
    
    
             // 重新计算消息的校验和
     
    
 
   - 
    
     
    
    
             // ...
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
             if ( Message::isAdminMsgType( msgType ) )
     
    
 
   - 
    
     
    
    
     
              {
     
    
 
   - 
    
     
    
    
                 // 跳过管理消息
     
    
 
   - 
    
     
    
    
                 if ( !begin ) begin = msgSeqNum;
     
    
 
   - 
    
     
    
    
     
              }
     
    
 
   - 
    
     
    
    
             else
     
    
 
   - 
    
     
    
    
     
              {
     
    
 
   - 
    
     
    
    
                 // 在 resend 里会回调 Application::toApp
     
    
 
   - 
    
     
    
    
                 if ( resend( msg ) )
     
    
 
   - 
    
     
    
    
     
                  {
     
    
 
   - 
    
     
    
    
                     // 有需要跳过的管理消息,则用一条 SeqReset-GapFill 消息替代
     
    
 
   - 
    
     
    
    
                     if ( begin ) generateSequenceReset( begin, msgSeqNum );
     
    
 
   - 
    
     
    
    
                     
     
    
 
   - 
    
     
    
    
                     // 发送应用消息
     
    
 
   - 
    
     
    
    
                     send( msg.toString(messageString) );
     
    
 
   - 
    
     
    
    
     
                      m_state.onEvent( "Resending Message: "
     
    
 
   - 
    
     
    
    
     
                              + IntConvertor::convert( msgSeqNum ) );
     
    
 
   - 
    
     
    
    
     
                      begin = 0;
     
    
 
   - 
    
     
    
    
     
                      appMessageJustSent = true;
     
    
 
   - 
    
     
    
    
     
                  }
     
    
 
   - 
    
     
    
    
                 else
     
    
 
   - 
    
     
    
    
     
                  { if ( !begin ) begin = msgSeqNum; }
     
    
 
   - 
    
     
    
    
     
              }
     
    
 
   - 
    
     
    
    
     
              current = msgSeqNum + 1;
     
    
 
   - 
    
     
    
    
     
          }
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
         // 结尾还有需要跳过的管理消息,需要用一条 SeqReset-GapFill 消息替代
     
    
 
   - 
    
     
    
    
         if ( begin )
     
    
 
   - 
    
     
    
    
     
          {
     
    
 
   - 
    
     
    
    
             generateSequenceReset( begin, msgSeqNum + 1 );
     
    
 
   - 
    
     
    
    
     
          }
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
         // 序列号同步。为什么在重传借宿后还需要再发送一个 SeqReset-GapFill 消息?
     
    
 
   - 
    
     
    
    
         if ( endSeqNo > msgSeqNum )
     
    
 
   - 
    
     
    
    
     
          {
     
    
 
   - 
    
     
    
    
     
              endSeqNo = EndSeqNo(endSeqNo + 1);
     
    
 
   - 
    
     
    
    
             int next = m_state.getNextSenderMsgSeqNum();
     
    
 
   - 
    
     
    
    
             if( endSeqNo > next )
     
    
 
   - 
    
     
    
    
     
                  endSeqNo = EndSeqNo(next);
     
    
 
   - 
    
     
    
    
             if ( appMessageJustSent )
     
    
 
   - 
    
     
    
    
     
                  beginSeqNo = msgSeqNum + 1;
     
    
 
   - 
    
     
    
    
             generateSequenceReset( beginSeqNo, endSeqNo );
     
    
 
   - 
    
     
    
    
     
          }
     
    
 
   - 
    
     
    
    
      
     
    
 
   - 
    
     
    
    
     
          resendRequest.getHeader().getField( msgSeqNum );
     
    
 
   - 
    
     
    
    
         if( !isTargetTooHigh(msgSeqNum) && !isTargetTooLow(msgSeqNum) )
     
    
 
   - 
    
     
    
    
     
              m_state.incrNextTargetMsgSeqNum();
     
    
 
   - 
    
     
    
    
     
      }
     
    
 
  
 
 
作者修行尚浅,这里只是浅读一下源码,由于使用经验不足,肯定对一些知识的认识不足,以后多加改正。
文章来源: fantianzuo.blog.csdn.net,作者:兔老大RabbitMQ,版权归原作者所有,如需转载,请联系作者。
原文链接:fantianzuo.blog.csdn.net/article/details/126285739
- 点赞
 - 收藏
 - 关注作者
 
            
           
评论(0)