Flink证券项目(六) 客户端程序
客户端程序
客户端负责接收服务端广播的实时行情数据,并对数据预处理加工,发送到消息队列Kafka中。
1.获取服务端数据
在cn.itcast.szse包下创建客户端接收数据对象:SocketClient
开发步骤:
1.创建main方法
2.建立socket连接,获取流数据
3.读文件缓存成交量和成交金额
4.解析行数据,数据转换
发送kafka
代码略
2.读取文件缓存成交数据
解析文件获取成交量和成交金额数据,并封装到Map<String, Map<String, Long>> map中。
开发步骤:
1.读取本地文件(深市)
2.解析行数据获取成交量和成交金额
3.封装数据到map
4.关流
注意:需要加载外部数据源基础文件,拷贝”4.资料\需求文档\数据源接口文档\数据源文件“中的szse-index.txt和szse-stock.txt到自定义本地磁盘文件目录下:E:\export\servers\tmp\socket
/**
* 深市行情实时采集数据
*/
public class SocketClient {
private static Map<String, Map<String, Long>> map = new HashMap<>();
//1.创建main方法
public static void main(String[] args) throws IOException {
//2.socket连接,获取流数据
Socket socket = new Socket(host, port);
InputStream inputStream = socket.getInputStream();
DataInputStream dataInputStream = new DataInputStream(inputStream);
//3.读文件缓存成交量和成交金额
parseFile();
}
public static void parseFile() {
try {
//1.读取本地文件(个股、指数)
//个股
BufferedReader brSzseStock = new BufferedReader(new InputStreamReader(new FileInputStream(new File("E:\\export\\servers\\tmp\\socket\\szse-stock.txt")), "UTF-8"));
//2.解析行数据获取成交量和成交金额
String lineTxtStock = null;
while ((lineTxtStock = brSzseStock.readLine()) != null) {
String[] arr = lineTxtStock.split("\\|");
String code = arr[1].trim();
long tradeVol = new Long(arr[3].trim()).longValue();
long tradeAmt = Double.valueOf(arr[4].trim()).longValue();
//3.封装数据到map
Map<String, Long> volAmtMap = new HashMap<>();
volAmtMap.put("tradeVol", tradeVol);
volAmtMap.put("tradeAmt", tradeAmt);
map.put(code, volAmtMap);
}
//4.关流
brSzseStock.close();
//指数
BufferedReader brSzseIndex = new BufferedReader(new InputStreamReader(new FileInputStream(new File("E:\\export\\servers\\tmp\\socket\\szse-index.txt")), "UTF-8"));
//2.解析行数据获取成交量和成交金额
String lineTxtIndex = null;
while ((lineTxtIndex = brSzseIndex.readLine()) != null) {
String[] arr = lineTxtIndex.split("\\|");
Map<String, Long> volAmtMap = new HashMap<>();
String code = arr[1].trim();
Long tradeVol = new Long(arr[3].trim()).longValue();
long tradeAmt = Double.valueOf(arr[4].trim()).longValue();
//3.封装数据到map
volAmtMap.put("tradeVol", tradeVol);
volAmtMap.put("tradeAmt", tradeAmt);
map.put(code, volAmtMap);
}
//4.关流
brSzseIndex.close();
} catch (Exception e) {
System.err.println("errors :" + e);
}
}
}
3.解析数据
第一步:在main方法中添加解析数据代码
//3.解析行数据,数据转换
while (true) {
String readUTF = dataInputStream.readUTF();
//数据转换成avro
StockAvro stockAvro = transform(readUTF);
System.out.println(stockAvro);
}
第二步:新建数据转换方法transform
开发步骤:
1.拷贝成交量和成交价数组
2.获取随机浮动成交量和价格
3.字符串切割、计算最新价
4.获取缓存的成交量/金额
计算总成交量/金额
6.缓存总成交量/金额
7.获取最高价和最低价(和最新价比较)
8.封装结果数据
//随机浮动成交价格系数
private static Double[] price = new Double[]{0.1, 0.11, 0.12, 0.13, 0.14, 0.15, 0.16, 0.17, 0.18, 0.19, 0.2, -0.1, -0.11, -0.12, -0.13, -0.14, -0.15, -0.16, -0.17, -0.18, -0.19, -0.2};
//随机浮动成交量
private static int[] volumn = new int[]{50, 80, 110, 140, 170, 200, 230, 260, 290, 320, 350, 380, 400, 500, 600, 700, 800, 900, 1000, 1100, 1200, 1300};
/**
* 字符串转换成avro对象
*/
private static SzsekAvro transform(String str) {
//1.字符串切割
String[] arr = str.split("\\|");
//2.获取随机浮动成交量和价格
Random random = new Random();
int priceIndex = random.nextInt(price.length);
Double priceRandom = price[priceIndex];
int volumnIndex = random.nextInt(volumn.length);
int volumnRandom = volumn[volumnIndex];
//3.计算最新价
BigDecimal tradePriceBase = new BigDecimal(arr[9].trim());
BigDecimal tradePrice = tradePriceBase.multiply(new BigDecimal(1 + priceRandom))
.setScale(2, BigDecimal.ROUND_HALF_UP);
//4.获取缓存的成交量/金额
Map<String, Long> volAmtMap = map.get(arr[1].trim());
Long tradeVol = 0l;
Long tradeAmt = 0l;
tradeVol = volAmtMap.get("tradeVol");
tradeAmt = volAmtMap.get("tradeAmt");
//计算总成交量/金额
//总成交量
Long tradeVolNew = 0l;
if (tradeVol != 0) {
tradeVolNew = tradeVol + volumnRandom;
}
//总成交金额
BigDecimal amt = tradePrice.multiply(new BigDecimal(volumnRandom)).setScale(2, RoundingMode.HALF_UP);
Long tradeAmtNew = tradeAmt + amt.longValue();
//缓存最新成交量和成交金额
volAmtMap.put("tradeVol", tradeVolNew);
volAmtMap.put("tradeAmt", tradeAmtNew);
map.put(arr[1].trim(), volAmtMap);
//6.获取最高价和最低价
//计算最高价
BigDecimal highPrice = new BigDecimal(arr[7].trim());
//最高价和最新价比较
if (tradePrice.compareTo(highPrice) == 1) {
highPrice = tradePrice;
}
//计算最低价
BigDecimal lowPrice = new BigDecimal(arr[8].trim());
//最低价和最新价比较
if (tradePrice.compareTo(lowPrice) == -1) {
lowPrice = tradePrice;
}
//7.封装结果数据
SzsekAvro szsekAvro = new SzsekAvro();
szsekAvro.setMdStreamID(arr[0].trim());
szsekAvro.setSecurityID(arr[1].trim());
szsekAvro.setSymbol(arr[2].trim());
szsekAvro.setTotalValueTraded(tradeAmtNew);
szsekAvro.setTradeVolume(tradeVolNew);
szsekAvro.setPreClosePx(Double.valueOf(arr[5].trim()));
szsekAvro.setOpenPrice(Double.valueOf(arr[6].trim()));
szsekAvro.setHighPrice(highPrice.doubleValue());
szsekAvro.setLowPrice(lowPrice.doubleValue());
szsekAvro.setTradePrice(tradePrice.doubleValue());
szsekAvro.setClosePx(tradePrice.doubleValue());
szsekAvro.setTradingPhaseCode(arr[11].trim());
szsekAvro.setTimestamp(new Date().getTime());
szsekAvro.setMarket("szse");
return szsekAvro;
}
4.发送Kafka
数据发送Kafka需要提前创建Kafka生产者对象,并提前创建好topic。
4.1.Kafka生产者对象
开发步骤:
1.创建类,泛型参数继承avro基类
2.设置生产者参数
3.自定avro序列化
4.添加发送数据方法
生产者参数:
bootstrap.servers :broker地址
acks :0,1和-1
retries:重试次数
batch.size:批量发送大小默认16384 (16kB)
linger.ms: 定时发送1ms
buffer.memory: 缓存大小33554432
key.serializer:key序列化
value.serializer: value序列化
代码略
4.2.自定义Avro序列化
在cn.itcast.avro目录创建序列化对象:AvroSerializer
开发步骤:
1.泛型参数继承avro基类,实现序列化接口
2.重写序列化方法
3.新建字节数组输出流对象
4.获取二进制对象BinaryEncoder
输出数据
代码略
数据发送
在while循环中发送转换之后的数据
while (true) {
String readUTF = dataInputStream.readUTF();
//数据转换成avro
StockAvro stockAvro = transform(readUTF);
//7.发送kafka
kafkaPro.sendData("szse", stockAvro);
}
6.打包部署
1.导入依赖
代码略
2.打包
启动命令
服务端:nohub -java -jar szse-server.jar &
客户端:nohub -java -jar szse-client.jar &
注意:
1.需要新建topic:szse
bin/kafka-topics.sh --create --partitions 1 --replication-factor 1 --zookeeper node01:2181 --topic szse
2.如果部署在服务器上需要在代码中修改文件解析的服务器路径
BufferedReader brSzseStock = new BufferedReader(new InputStreamReader(new FileInputStream(new File(“E:\export\servers\tmp\socket\szse-stock.txt”)), “UTF-8”));
- 点赞
- 收藏
- 关注作者
评论(0)