Flink证券项目(三) 数据源

举报
Maynor学长 发表于 2022/11/24 15:48:33 2022/11/24
【摘要】 1.1. 数据源 1.1.1. 实时数据源接口 1.1.1.1. 文本数据上证行情数据接口文档文件名:竞价行情文件(sse.txt)接收方:[新债券业务管理系统,新公司业务管理系统,行情监测系统,期权风控系统,指数公司,信息中心,信息公司,港交所,新基金业务管理系统]功能描述:本文件为行情文件接口。对于竞价撮合平台,行情文件名为sse.txt,当日开市至交易结束期间实时发送。产品类型:债券...

1.1. 数据源

1.1.1. 实时数据源接口

1.1.1.1. 文本数据

上证行情数据接口文档

文件名:竞价行情文件(sse.txt)

接收方:[新债券业务管理系统,新公司业务管理系统,行情监测系统,期权风控系统,指数公司,信息中心,信息公司,港交所,新基金业务管理系统]

功能描述:本文件为行情文件接口。对于竞价撮合平台,行情文件名为sse.txt,当日开市至交易结束期间实时发送。

产品类型:债券、基金、股票和权证

时间和频率:每日开市前至交易结束期间实时发送。对于信息公司和竞价系统,在每个交易日8:30前行情服务可用;也可以从存储网关(FTP)/home/ftptest获取,在每个交易日8:30前就绪。

文件路径:由行情文件落地至目录/home/ftptest,以供接收方读取

数据格式:

文件头定义,第一行特殊记录:

序号 域名 字段名 字段类型 描述
1 BeginString 起始标识符 C6 HEADER
2 Version 版本 C8 MTP1.00
3 BodyLength 数据长度 N10 计算出的长度字段分隔符后面的字节数(不包含本字段后面的分隔符),包含其他字段后面的分隔符、换行符。
4 TotNumTradeReports 文件体记录数 N5 文件体记录数
5 MDReportID 行情序号 N8 行情文件序号(预留,暂不填)
6 SenderCompID 发送方 C6 发送方标示符,取XSHG01
7 MDTime 行情时间 C21 行情时间,格式为YYYYMMDD-HH:MM:SS.000
8 MDUpdateType 发送方式 N1 0 = 快照Full refresh1 = 增量Incremental(暂不支持)
9 MDSesStatus 市场行情状态 C8 全市场行情状态:该字段左起每位表示特定的含义,无定义则填空格。第1位:‘S’表示全市场启动期间(开市前),‘T’表示全市场处于交易期间(含中间休市), ‘E’表示全市场处于闭市期间。第2位:‘1’表示开盘集合竞价结束标志,未结束取‘0’。第3位:‘0’表示市场行情结束标志,未结束取‘1’。
扩展区域 不定 系统应能支持记录尾部扩展新的字段。

文件体定义,多条记录。

行情数据类型取值标识字母MD加类型编号,当取值为MD001时,文件体记录格式如下:

序号 域名 字段名 字段类型 描述
1 mdStreamID 行情数据类型 C5 行情数据类型标识符,取值MD001 表示指数行情数据格式类型,其中指数目前实际精度为4位小数;
2 securityID 产品代码 C6 指数代码
3 symbol 产品名称 C8 指数简称
4 tradeVolume 成交数量 N16 参与计算相应指数的交易数量,股票指数交易数量单位是100股,基金指数的交易数量单位是100份,债券指数的交易数量单位是手。
5 totalValueTraded 成交金额 N16(2) 参与计算相应指数的成交金额,单位均为人民币元。
6 preClosePx 昨日收盘价 N11(4) 前收盘指数
7 openPrice 今日开盘价 N11(4) 今开盘指数
8 highPrice 最高价 N11(4) 最高指数
9 lowPrice 最低价 N11(4) 最低指数
10 tradePrice 最新价 N11(4) 最新指数
11 closePx 今收盘价 N11(4) 今收盘指数,无取值取空格
12 tradingPhaseCode 指数实时阶段及标志 C8 该字段左起每位表示特定的含义,无定义则填空格。
13 timestamp 时间戳 N13 timestamp

行情数据类型取值标识字母MD加类型编号,当取值为MD002、MD003、MD004时,文件体记录格式如下:

序号 字段 字段中文 字段类型 描述
1 mdStreamID 行情数据类型 C5 行情数据类型标识符,取值MD002 表示股票(A、B股)行情数据格式类型;MD003 表示债券行情数据格式类型;MD004 表示基金行情数据格式类型;
2 securityID 产品代码 C6 证券代码
3 symbol 产品名称 C8 证券简称
4 tradeVolume 成交数量 N16
5 totalValueTraded 成交金额 N16(2)
6 preClosePx 昨日收盘价 N11(3)
7 openPrice 今日开盘价 N11(3)
8 highPrice 最高价 N11(3)
9 lowPrice 最低价 N11(3)
10 tradePrice 最新价 N11(3) 最新成交价
11 closePx 今收盘价 N11(3)
12 buyPrice1 申买价一 N11(3) 当前买入价
13 buyVolume1 申买量一 N12
14 sellPrice1 申卖价一 N11(3) 当前卖出价
15 sellVolume1 申卖量一 N12
16 buyPrice2 申买价二 N11(3)
17 buyVolume2 申买量二 N12
18 sellPrice2 申卖价二 N11(3)
19 sellVolume2 申卖量二 N12
20 buyPrice3 申买价三 N11(3)
21 buyVolume3 申买量三 N12
22 sellPrice3 申卖价三 N11(3)
23 sellVolume3 申卖量三 N12
24 buyPrice4 申买价四 N11(3)
25 buyVolume4 申买量四 N12
26 sellPrice4 申卖价四 N11(3)
27 sellVolume4 申卖量四 N12
28 buyPrice5 申买价五 N11(3)
29 buyVolume5 申买量五 N12
30 sellPrice5 申卖价五 N11(3)
31 sellVolume5 申卖量五 N12
32 preCloseIOPV 基金T-1日收盘时刻IOPV N11(3) 可选字段,仅当MDStreamID=MD004时存在该字段。
33 IOPV 基金IOPV N11(3) 可选字段,仅当MDStreamID=MD004时存在该字段。
34 tradingPhaseCode 产品实时阶段及标志 C8 全市场行情状态:该字段左起每位表示特定的含义,无定义则填空格。第1位:‘S’表示全市场启动期间(开市前),‘T’表示全市场处于交易期间(含中间休市), ‘E’表示全市场处于闭市期间。第2位:‘1’表示开盘集合竞价结束标志,未结束取‘0’。第3位:‘0’表示市场行情结束标志,未结束取‘1’。
35 timestamp 时间戳 C12 HH:MM:SS.000
扩展区域 不定 系统应能支持记录尾部扩展新的字段。

备注:

(一)行情文件包含文件头、文件体两个部分;

(二)注意行情文件可扩展性,文件记录尾部可能随时增加扩展字段,接收处理方应能向下兼容处理,即增加扩展字段后,对新增字段无需识别处理的用户,不需要升级系统;注意根据文件体首字段MDStreamID取值解析不同的文件体记录格式;

(三)文件体中,对于不同的行情数据类型,分别定义记录格式,接收处理方应能根据行情数据类型识别处理;

(四)文件体记录按行情数据类型递增,同一行情数据类型按证券代码递增;

(五)对于暂停上市的股票,文件中不包含该股票的记录。发行期内对于未设定开始上市交易的产品,文件中不包含该产品的记录。对于摘牌后的产品,文件中不包含该产品的记录;

(六)如文件中数值字段取值超过约定格式最大值,取最大值,如N10取9999999999;

(七)前收盘价格,在开市之前即行发布。

(八)当产品代码为债券(国债、企债、可转债)时,由于债券交易的数量以手为单位,成交金额为该债券每笔成交的价格数量10的总和;

当产品代码为席位质押式国债回购代码201、席位质押式企业债回购代码202以及账户质押式国债回购代码204时,成交金额为100成交数量10;

当产品代码为买断式国债回购代码203时,成交金额为其基础产品昨日收盘价成交数量10;

(九)对于产品价格、金额数据单位,除B股为美元外,其他为人民币元。对于产品数量数据单位,股票为股,基金、权证为份,债券与回购为手。对于产品TradeVolume成交数量单位,股票为股,基金为份,债券与回购为手,权证为100份;对于指数价格、金额、数量单位,详见表格具体定义说明;

(十)在集合竞价时段内,当前买入价和当前卖出价中同时为虚拟开盘参考价格,即根据集合竞价算法计算得出的虚拟撮合价格。同时申买量一和申卖量一为行情发布时刻的虚拟匹配量。申买量二为行情发布时刻的买方虚拟未匹配量。申卖量二为行情发布时刻的卖方虚拟未匹配量;

(十一)在开盘集合竞价之后的短暂休市和中午休市期间同样揭示各档买卖价格数量等全部信息;

(十二)临时停牌期间不揭示五档行情;

(十三)字段无意义或无该字段行情数据时,字符填空格,数值填0;

(十四)具体产品代码,参见相关发文通知,并参考以上处理。

样例文件如下:

HEADER|MTP1.00 | 1258021| 3290| |XSHG01|20181218-14:36:40.230|0|E1111 MD001|000001|上证指数| 28798| 1487411410.20| 4677.4600| 73569.8641| 74628.6808| 73569.8641| 73750.4214| 73750.4214| |11:19:36.550MD001|000002|A股指数| 28797| 1487411133.00| 4908.6860| 77394.4566| 78508.5020| 77394.4566| 77584.4348| 77584.4348| |11:19:36.550MD001|000003|B股指数| 1| 277.20| 271.4220| 288.3062| 288.3062| 288.2435| 288.2435| 288.2435| |11:19:36.550

img

1.1.1.2. 流数据

深证行情数据接口文档

序号 字段 字段中文 字段类型 描述
1 mdStreamID 行情数据类型 C6 个股标识:010指数标识:900
2 securityID 产品代码 C6
3 symbol 产品名称 C8
4 tradeVolume 成交数量 N16
5 totalValueTraded 成交金额 N16(2)
6 preClosePx 昨日收盘价 N11(4)
7 openPrice 今日开盘价 N11(4)
8 highPrice 最高价 N11(4)
9 lowPrice 最低价 N11(4)
10 tradePrice 最新价 N11(4)
11 closePx 今收盘价 N11(4)
12 tradingPhaseCode 实时阶段及标志 C8 全市场行情状态:该字段左起每位表示特定的含义,无定义则填空格。第1位:‘S’表示全市场启动期间(开市前),‘T’表示全市场处于交易期间(含中间休市), ‘E’表示全市场处于闭市期间。第2位:‘1’表示开盘集合竞价结束标志,未结束取‘0’。第3位:‘0’表示市场行情结束标志,未结束取‘1’。
13 timestamp 时间戳 N13
样例:

900|399001|深证成指|      28798|  1487411410.20|  4677.4600| 73569.8641| 74628.6808| 73569.8641| 73750.4214| 73750.4214|   T01  |11:19:36.550

010|000001|平安银行  |    316124|   4041900.93 |   12.78 |   12.88|    15.46|    10.3|

12.77   |    12.77|  T01  |11:19:36.550

1.2. 高性能数据传输中间件

在企业级大数据流处理项目中,往往在项目数据源处需要面临实时海量数据的采集。采集数据的性能一般与网络带宽、机器硬件、数据量等因素有直接关系;当其他因素是固定的,这里我们只考虑数据量的话,那么数据量的传输和存储性能是我们首先需要面对和解决的。

由此我们引入了Avro数据序列化框架,来解决数据的传输性能问题。

1.2.1. Apache Avro介绍

Apache Avro(以下简称 Avro)是一个数据序列化系统,是一种与编程语言无关的序列化格式,是提供一种共享数据文件的方式。Avro是Hadoop中的一个子项目,Avro是一个基于二进制数据传输高性能的中间件。Avro可以做到将数据进行序列化,适用于远程或本地大批量数据交互。在传输的过程中Avro对数据二进制序列化后节约数据存储空间和网络传输带宽。

序列化就是将对象转换成二进制流,相应的反序列化就是将二进制流再转换成对应的对象。因此,Avro就是用来在传输数据之前,将对象转换成二进制流,然后此二进制流达到目标地址后,Avro再将二进制流转换成对象。

Avro特点:

·1.丰富的数据结构

·2.一个紧凑的,快速的,二进制的数据格式

·3.一个容器文件,来存储持久化数据

·4.远程过程调用(RPC)

·5.简单的动态语言集成。

·6.Avro模式是使用JSON定义的 。这有助于以已经具有JSON库的语言实现。

JSON是一种轻量级的数据传输格式,对于大数据集,JSON数据会显示力不从心,因为JSON的格式是key:value型,每条记录都要附上key的名字,有的时候,光key消耗的空间甚至会超过value所占空间,这对空间的浪费十分严重,尤其是对大型数据集来说,因为它不仅不够紧凑,还要重复地加上key信息,不仅会造成存储空间上的浪费,更会增加了数据传输的压力,从而给集群增加负担,进而影响整个集群的吞吐量。而采用Avro数据序列化系统可以比较好的解决此问题,因为用Avro序列化后的文件由schema和真实内容组成,schema只是数据的元数据,相当于JSON数据的key信息,schema单独存放在一个JSON文件中,这样一来,数据的元数据只存了一次,相比JSON数据格式的文件,大大缩小了存储容量。从而使得Avro文件可以更加紧凑地组织数据。

官网地址:http://avro.apache.org/docs/current/gettingstartedjava.html

1.2.2. Avro文件规范

AVRO的Schema是用JSON的格式表示的,Schema可以用

JSON String 来命名一个定义的类型

JSON 对象,形式如:

引用
{"namespace": "cn.itcast.demo", "type": "record", "name": "User", "fields": [   {"name": "name", "type": "string"},   {"name": "age",  "type": ["int", "null"]},   {"name": "address", "type": ["string", "null"]} ]}

1.2.2.1. 原生类型

原生类型如下所示:

null: 表示没有值

boolean: 表示一个二进制布尔值

int: 表示32位有符号整数

long: 表示64位有符号整数

float: 表示32位的单精度浮点数

double: 表示64位双精度浮点数

bytes: 表示8位的无符号字节序列

string: Unicode 编码的字符序列

总共就这8种原生数据类型,这些原生数据类型均没有明确的属性。

1.2.2.2. 复杂类型

AVRO支持6种复杂类型,分别是:records, enums, arrays, maps, unions,fixed,这里我门着重讲解Recoeds类型。

1.Records

Records使用类型名称“record”,并且支持三个必选属性。

type: 必有属性。

name: 必有属性,是一个JSON string,提供了记录的名字。

namespace,也是一个JSON string,用来限定和修饰name属性。

doc: 可选属性,是一个JSON string,为使用这个Schema的用户提供文档。

aliases: 可选属性,是JSON的一个string数组,为这条记录提供别名。

fields: 必选属性,是一个JSON数组,数组中列举了所有的field。

每一个field都是一个JSON对象,并且具有如下属性:

(1)name: 必选属性,field的名字,是一个JSON string。例如:

 "fields": [

   {"name": "name", "type": "string"},

   {"name": "age",  "type": ["int", "null"]},

   {"name": "address", "type": ["string", "null"]}

 ]

(2)doc: 可选属性,为使用此Schema的用户提供了描述此field的文档。

(3)type: 必选属性,定义Schema的一个JSON对象,或者是命名一条记录定义的JSON string。

(4)default: 可选属性,即field的默认值,当读到缺少这个field的实例时用到。默认值的允许的范围由这个field的Schama的类型决定。

order: 可选属性,指定这个field如何影响record的排序。有效的可选值为“ascending”(默认),“descending"和"ignore”

alias: JSON的string数组,为这个field提供别名。

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。