Flink证券项目(七)数据采集
项目进度:
1. 数据采集
1.1. 沪市行情实时文本数据采集
沪市行情服务端会在交易时间段内对外实时广播行情数据,实时行情数据以txt文本的形式写入存储网关(FTP服务),这里我们采用Flume自定source的方式实时采集FTP服务器上的行情数据源。
1.1.1. 沪市采集数据流程
1.1.2. 导入依赖
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.ftpserver</groupId>
<artifactId>ftpserver-core</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
<version>3.1</version>
</dependency>
1.1.3. 配置文件
参考“4.资料配置文件sse-ftp-source.conf”,
上传配置文件到服务器flume安装目录:/export/servers/flume-1.6.0-cdh5.14.0-bin/conf
代码略
1.1.4. 自定义source
在 cn.itcast.sse目录下创建SseQuotSource接口
(1)实现目标
使用flume自定义source从FTP服务器采集实时行情文本文件
(2)实现步骤
1.实现自定义source接口
2.初始化参数(初始化source参数和线程池)
3.判断是否是交易时间段
4.异步处理
5.设置延时时间
1.1.1.1. 自定义source接口
代码略
1.1.1.2. 初始化方法
代码略
1.1.1.3. 交易时间段判断
证券市场连续竞价阶段
1、上海交易所:9:30 — 11:30;13:00 — 15:00
2、深圳交易所:9:30 — 11:30;13:00 — 15:00
1.1.1.1.1. 交易时间段工具类
在cn.itcast.util目录下创建DateTimeUtil
开发步骤:
1.新建Calendar对象,设置日期
2.设置开市时间
3.设置闭市时间
1.1.1.1.2. 时间段判断
1.1.1.4. 异步处理
ftp上的实时行情文本数据会每秒钟被覆盖一次,为了防止数据丢失和更好的处理性能,我们采用多线程ThreadPoolExecutor异步处理的方式解析文本数据。
开发步骤:
1.创建异步线程task
2.下载行情文件
3.解析并发送数据
数据转换成avro
数据序列化
4.发送数据到channel
1.1.1.1.1. 创建线程
代码略
1.1.1.1.2. 下载行情文件
开发步骤:
- 初始化ftp连接
(1)设置IP和Port
(2)设置登陆用户名和密码
(3) 设置编码格式
(4)判断是否连接成功(FTPReply)
2.切换工作目录,设置被动模式
3.获取工作目录的文件信息列表
4.输出文件
5.退出,返回成功状态
注意:FTP参数设置:
//被动模式,服务端开放端口,用于数据传输
ftpClient.enterLocalPassiveMode();
//禁用服务端参与的验证,如果不禁用服务端会获取主机IP与提交的host进行匹配,不一致时会报错
ftpClient.setRemoteVerificationEnabled(false);
1.1.1.1.3. 解析并发送数据
解析的文本数据会转换成Avro对象数据,需要将Avro对象:
将“4.资料avroAvro对象SseAvro.java”copy工程目录:cn.itcast.avro
开发步骤:
1.读取文件获取行数据
2.获取首行,判断市场行情状态
3.数据转换成avro对象
4.数据序列化
5.发送数据到管道
1.1.1.1.1.1. 转换成avro对象
private SseAvro transform(String[] arr) {
SseAvro sseAvro = new SseAvro();
sseAvro.setMdStreamID(arr[0].trim());
sseAvro.setSecurityID(arr[1].trim());
sseAvro.setSymbol(arr[2].trim());
sseAvro.setTradeVolume(Long.valueOf(arr[3].trim()));
sseAvro.setTotalValueTraded(Long.valueOf(arr[4].trim()));
sseAvro.setPreClosePx(Double.valueOf(arr[5].trim()));
sseAvro.setOpenPrice(Double.valueOf(arr[6].trim()));
sseAvro.setHighPrice(Double.valueOf(arr[7].trim()));
sseAvro.setLowPrice(Double.valueOf(arr[8].trim()));
sseAvro.setTradePrice(Double.valueOf(arr[9].trim()));
sseAvro.setClosePx(Double.valueOf(arr[10].trim()));
sseAvro.setTradingPhaseCode("T11");
sseAvro.setTimestamp(new Date().getTime());
return sseAvro;
}
1.1.1.1.1.2. 数据序列化
private byte[] serialize(SseAvro sseAvro) {
DatumWriter<SseAvro> specificDatumWriter = new SpecificDatumWriter<>(SseAvro.getSchema());
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
BinaryEncoder binaryEncoder = EncoderFactory.get().directBinaryEncoder(byteArrayOutputStream, null);
try {
specificDatumWriter.write(sseAvro, binaryEncoder);
} catch (IOException e) {
e.printStackTrace();
}
return byteArrayOutputStream.toByteArray();
}
- 点赞
- 收藏
- 关注作者
评论(0)