Flink证券项目(七)数据采集

举报
Maynor学长 发表于 2022/11/25 15:32:31 2022/11/25
【摘要】 项目进度: 1. 数据采集 1.1. 沪市行情实时文本数据采集沪市行情服务端会在交易时间段内对外实时广播行情数据,实时行情数据以txt文本的形式写入存储网关(FTP服务),这里我们采用Flume自定source的方式实时采集FTP服务器上的行情数据源。 1.1.1. 沪市采集数据流程 1.1.2. 导入依赖<dependency> <groupId>org.apache.flume</gro...

项目进度:

image-20221125152357493

1. 数据采集

1.1. 沪市行情实时文本数据采集

沪市行情服务端会在交易时间段内对外实时广播行情数据,实时行情数据以txt文本的形式写入存储网关(FTP服务),这里我们采用Flume自定source的方式实时采集FTP服务器上的行情数据源。

1.1.1. 沪市采集数据流程

img

1.1.2. 导入依赖

<dependency>
	<groupId>org.apache.flume</groupId>
	<artifactId>flume-ng-core</artifactId>
	<version>1.6.0</version>
</dependency>
<dependency>
	<groupId>org.apache.ftpserver</groupId>
	<artifactId>ftpserver-core</artifactId>
	<version>1.1.1</version>
</dependency>
<dependency>
	<groupId>commons-net</groupId>
	<artifactId>commons-net</artifactId>
	<version>3.1</version>
</dependency>

1.1.3. 配置文件

参考“4.资料配置文件sse-ftp-source.conf”,

上传配置文件到服务器flume安装目录:/export/servers/flume-1.6.0-cdh5.14.0-bin/conf

代码略

1.1.4. 自定义source

在 cn.itcast.sse目录下创建SseQuotSource接口

(1)实现目标

使用flume自定义source从FTP服务器采集实时行情文本文件

(2)实现步骤

1.实现自定义source接口

2.初始化参数(初始化source参数和线程池)

3.判断是否是交易时间段

4.异步处理

5.设置延时时间

1.1.1.1. 自定义source接口

代码略

1.1.1.2. 初始化方法

代码略

1.1.1.3. 交易时间段判断

证券市场连续竞价阶段

1、上海交易所:9:30 — 11:30;13:00 — 15:00

2、深圳交易所:9:30 — 11:30;13:00 — 15:00

1.1.1.1.1. 交易时间段工具类

在cn.itcast.util目录下创建DateTimeUtil

开发步骤:

1.新建Calendar对象,设置日期

2.设置开市时间

3.设置闭市时间

1.1.1.1.2. 时间段判断

1.1.1.4. 异步处理

ftp上的实时行情文本数据会每秒钟被覆盖一次,为了防止数据丢失和更好的处理性能,我们采用多线程ThreadPoolExecutor异步处理的方式解析文本数据。

开发步骤:

1.创建异步线程task

2.下载行情文件

3.解析并发送数据

数据转换成avro

数据序列化

4.发送数据到channel

1.1.1.1.1. 创建线程
代码略
1.1.1.1.2. 下载行情文件

开发步骤:

  1. 初始化ftp连接

(1)设置IP和Port

(2)设置登陆用户名和密码

(3) 设置编码格式

(4)判断是否连接成功(FTPReply)

2.切换工作目录,设置被动模式

3.获取工作目录的文件信息列表

4.输出文件

5.退出,返回成功状态

注意:FTP参数设置:

//被动模式,服务端开放端口,用于数据传输

ftpClient.enterLocalPassiveMode();

//禁用服务端参与的验证,如果不禁用服务端会获取主机IP与提交的host进行匹配,不一致时会报错

ftpClient.setRemoteVerificationEnabled(false);

1.1.1.1.3. 解析并发送数据

解析的文本数据会转换成Avro对象数据,需要将Avro对象:

将“4.资料avroAvro对象SseAvro.java”copy工程目录:cn.itcast.avro

开发步骤:

1.读取文件获取行数据

2.获取首行,判断市场行情状态

3.数据转换成avro对象

4.数据序列化

5.发送数据到管道

1.1.1.1.1.1. 转换成avro对象
private SseAvro transform(String[] arr) {
        SseAvro sseAvro = new SseAvro();
        sseAvro.setMdStreamID(arr[0].trim());
        sseAvro.setSecurityID(arr[1].trim());
        sseAvro.setSymbol(arr[2].trim());
        sseAvro.setTradeVolume(Long.valueOf(arr[3].trim()));
        sseAvro.setTotalValueTraded(Long.valueOf(arr[4].trim()));
        sseAvro.setPreClosePx(Double.valueOf(arr[5].trim()));
        sseAvro.setOpenPrice(Double.valueOf(arr[6].trim()));
        sseAvro.setHighPrice(Double.valueOf(arr[7].trim()));
        sseAvro.setLowPrice(Double.valueOf(arr[8].trim()));
        sseAvro.setTradePrice(Double.valueOf(arr[9].trim()));
        sseAvro.setClosePx(Double.valueOf(arr[10].trim()));
        sseAvro.setTradingPhaseCode("T11");
        sseAvro.setTimestamp(new Date().getTime());
    return sseAvro;
}
1.1.1.1.1.2. 数据序列化
private byte[] serialize(SseAvro sseAvro) {

    DatumWriter<SseAvro> specificDatumWriter = new SpecificDatumWriter<>(SseAvro.getSchema());
    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();

    BinaryEncoder binaryEncoder = EncoderFactory.get().directBinaryEncoder(byteArrayOutputStream, null);
    try {
        specificDatumWriter.write(sseAvro, binaryEncoder);
    } catch (IOException e) {
        e.printStackTrace();
    }

    return byteArrayOutputStream.toByteArray();
}
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。