Flink证券项目(六) 客户端程序

举报
Maynor学长 发表于 2022/11/25 15:32:02 2022/11/25
【摘要】 客户端程序客户端负责接收服务端广播的实时行情数据,并对数据预处理加工,发送到消息队列Kafka中。1.获取服务端数据在cn.itcast.szse包下创建客户端接收数据对象:SocketClient开发步骤:1.创建main方法2.建立socket连接,获取流数据3.读文件缓存成交量和成交金额4.解析行数据,数据转换发送kafka代码略2.读取文件缓存成交数据解析文件获取成交量和成交金额数...

客户端程序

客户端负责接收服务端广播的实时行情数据,并对数据预处理加工,发送到消息队列Kafka中。
1.获取服务端数据
在cn.itcast.szse包下创建客户端接收数据对象:SocketClient
开发步骤:
1.创建main方法
2.建立socket连接,获取流数据
3.读文件缓存成交量和成交金额
4.解析行数据,数据转换
发送kafka

代码略

2.读取文件缓存成交数据
解析文件获取成交量和成交金额数据,并封装到Map<String, Map<String, Long>> map中。

开发步骤:
1.读取本地文件(深市)
2.解析行数据获取成交量和成交金额
3.封装数据到map
4.关流

注意:需要加载外部数据源基础文件,拷贝”4.资料\需求文档\数据源接口文档\数据源文件“中的szse-index.txt和szse-stock.txt到自定义本地磁盘文件目录下:E:\export\servers\tmp\socket

/**
 * 深市行情实时采集数据
 */
public class SocketClient {

    private static Map<String, Map<String, Long>> map = new HashMap<>();

    //1.创建main方法
public static void main(String[] args) throws IOException {
        
        //2.socket连接,获取流数据
        Socket socket = new Socket(host, port);
        InputStream inputStream = socket.getInputStream();
        DataInputStream dataInputStream = new DataInputStream(inputStream);
	
	   //3.读文件缓存成交量和成交金额
        parseFile();
    }

    public static void parseFile() {
        try {
		 //1.读取本地文件(个股、指数)
            //个股
            BufferedReader brSzseStock = new BufferedReader(new InputStreamReader(new FileInputStream(new File("E:\\export\\servers\\tmp\\socket\\szse-stock.txt")), "UTF-8"));
            
		 //2.解析行数据获取成交量和成交金额
    	    String lineTxtStock = null;
    	    while ((lineTxtStock = brSzseStock.readLine()) != null) {
    	        String[] arr = lineTxtStock.split("\\|");
    	        String code = arr[1].trim();
    	        long tradeVol = new Long(arr[3].trim()).longValue();
    	        long tradeAmt = Double.valueOf(arr[4].trim()).longValue();
			//3.封装数据到map
    	        Map<String, Long> volAmtMap = new HashMap<>();
    	        volAmtMap.put("tradeVol", tradeVol);
    	        volAmtMap.put("tradeAmt", tradeAmt);
    	        map.put(code, volAmtMap);
    	    }
			
		 //4.关流
    	    brSzseStock.close();
		
    	    //指数
    	    BufferedReader brSzseIndex = new BufferedReader(new InputStreamReader(new FileInputStream(new File("E:\\export\\servers\\tmp\\socket\\szse-index.txt")), "UTF-8"));
    	    //2.解析行数据获取成交量和成交金额
    	    String lineTxtIndex = null;
    	    while ((lineTxtIndex = brSzseIndex.readLine()) != null) {
    	        String[] arr = lineTxtIndex.split("\\|");
    	        Map<String, Long> volAmtMap = new HashMap<>();
    	        String code = arr[1].trim();
    	        Long tradeVol = new Long(arr[3].trim()).longValue();
    	        long tradeAmt = Double.valueOf(arr[4].trim()).longValue();
			//3.封装数据到map
    	        volAmtMap.put("tradeVol", tradeVol);
    	        volAmtMap.put("tradeAmt", tradeAmt);
    	        map.put(code, volAmtMap);
    	    }
		//4.关流
    	    brSzseIndex.close();
    	} catch (Exception e) {
    	    System.err.println("errors :" + e);
    	}
    }
}

3.解析数据
第一步:在main方法中添加解析数据代码

//3.解析行数据,数据转换

while (true) {
    String readUTF = dataInputStream.readUTF();
    //数据转换成avro
    StockAvro stockAvro = transform(readUTF);
    System.out.println(stockAvro);
}

第二步:新建数据转换方法transform

开发步骤:
1.拷贝成交量和成交价数组
2.获取随机浮动成交量和价格
3.字符串切割、计算最新价
4.获取缓存的成交量/金额
计算总成交量/金额
6.缓存总成交量/金额
7.获取最高价和最低价(和最新价比较)
8.封装结果数据

//随机浮动成交价格系数
private static Double[] price = new Double[]{0.1, 0.11, 0.12, 0.13, 0.14, 0.15, 0.16, 0.17, 0.18, 0.19, 0.2, -0.1, -0.11, -0.12, -0.13, -0.14, -0.15, -0.16, -0.17, -0.18, -0.19, -0.2};

//随机浮动成交量
private static int[] volumn = new int[]{50, 80, 110, 140, 170, 200, 230, 260, 290, 320, 350, 380, 400, 500, 600, 700, 800, 900, 1000, 1100, 1200, 1300};

 /**

 * 字符串转换成avro对象
   */
   private static SzsekAvro transform(String str) {

   //1.字符串切割
   String[] arr = str.split("\\|");
   //2.获取随机浮动成交量和价格
   Random random = new Random();
   int priceIndex = random.nextInt(price.length);
   Double priceRandom = price[priceIndex];
   int volumnIndex = random.nextInt(volumn.length);
   int volumnRandom = volumn[volumnIndex];

   //3.计算最新价
   BigDecimal tradePriceBase = new BigDecimal(arr[9].trim());
   BigDecimal tradePrice = tradePriceBase.multiply(new BigDecimal(1 + priceRandom))
           .setScale(2, BigDecimal.ROUND_HALF_UP);

   //4.获取缓存的成交量/金额
   Map<String, Long> volAmtMap = map.get(arr[1].trim());
   Long tradeVol = 0l;
   Long tradeAmt = 0l;
   tradeVol = volAmtMap.get("tradeVol");
   tradeAmt = volAmtMap.get("tradeAmt");

   //计算总成交量/金额
   //总成交量
   Long tradeVolNew = 0l;
   if (tradeVol != 0) {
       tradeVolNew = tradeVol + volumnRandom;
   }
   //总成交金额
   BigDecimal amt = tradePrice.multiply(new BigDecimal(volumnRandom)).setScale(2, RoundingMode.HALF_UP);
   Long tradeAmtNew = tradeAmt + amt.longValue();

   //缓存最新成交量和成交金额
   volAmtMap.put("tradeVol", tradeVolNew);
   volAmtMap.put("tradeAmt", tradeAmtNew);
   map.put(arr[1].trim(), volAmtMap);

   //6.获取最高价和最低价
   //计算最高价
   BigDecimal highPrice = new BigDecimal(arr[7].trim());
   //最高价和最新价比较
   if (tradePrice.compareTo(highPrice) == 1) {
       highPrice = tradePrice;
   }

   //计算最低价
   BigDecimal lowPrice = new BigDecimal(arr[8].trim());
   //最低价和最新价比较
   if (tradePrice.compareTo(lowPrice) == -1) {
       lowPrice = tradePrice;
   }

   //7.封装结果数据
   SzsekAvro szsekAvro = new SzsekAvro();
   szsekAvro.setMdStreamID(arr[0].trim());
   szsekAvro.setSecurityID(arr[1].trim());
   szsekAvro.setSymbol(arr[2].trim());
   szsekAvro.setTotalValueTraded(tradeAmtNew);
   szsekAvro.setTradeVolume(tradeVolNew);
   szsekAvro.setPreClosePx(Double.valueOf(arr[5].trim()));
   szsekAvro.setOpenPrice(Double.valueOf(arr[6].trim()));
   szsekAvro.setHighPrice(highPrice.doubleValue());
   szsekAvro.setLowPrice(lowPrice.doubleValue());
   szsekAvro.setTradePrice(tradePrice.doubleValue());
   szsekAvro.setClosePx(tradePrice.doubleValue());
   szsekAvro.setTradingPhaseCode(arr[11].trim());
   szsekAvro.setTimestamp(new Date().getTime());
   szsekAvro.setMarket("szse");
   return szsekAvro;
   }

4.发送Kafka
数据发送Kafka需要提前创建Kafka生产者对象,并提前创建好topic。
4.1.Kafka生产者对象
开发步骤:
1.创建类,泛型参数继承avro基类
2.设置生产者参数
3.自定avro序列化
4.添加发送数据方法

生产者参数:
bootstrap.servers :broker地址
acks :0,1和-1
retries:重试次数
batch.size:批量发送大小默认16384 (16kB)
linger.ms: 定时发送1ms
buffer.memory: 缓存大小33554432
key.serializer:key序列化
value.serializer: value序列化

代码略

4.2.自定义Avro序列化
在cn.itcast.avro目录创建序列化对象:AvroSerializer
开发步骤:
1.泛型参数继承avro基类,实现序列化接口
2.重写序列化方法
3.新建字节数组输出流对象
4.获取二进制对象BinaryEncoder
输出数据

代码略

数据发送
在while循环中发送转换之后的数据

while (true) {
    String readUTF = dataInputStream.readUTF();
    //数据转换成avro
    StockAvro stockAvro = transform(readUTF);
    //7.发送kafka
    kafkaPro.sendData("szse", stockAvro);
}

6.打包部署
1.导入依赖

代码略

2.打包

启动命令
服务端:nohub -java -jar szse-server.jar &
客户端:nohub -java -jar szse-client.jar &
注意:
1.需要新建topic:szse
bin/kafka-topics.sh --create --partitions 1 --replication-factor 1 --zookeeper node01:2181 --topic szse
2.如果部署在服务器上需要在代码中修改文件解析的服务器路径
BufferedReader brSzseStock = new BufferedReader(new InputStreamReader(new FileInputStream(new File(“E:\export\servers\tmp\socket\szse-stock.txt”)), “UTF-8”));

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。