实时即未来,大数据项目车联网之实时ETL任务消费数据
1 实时ETL任务消费数据
1 消费数据解析逻辑
创建流式环境,设置检查点、kafka分区发现、任务重启策略,数据积压
根据kafka属性配置创建FlinkKafkaConsumer,消费kafka数据
根据kafka数据获得DataStream,进行json解析
消费数据中的数据解析逻辑:
n 得到解析成功的数据,称为正确数据
u 原始文本为一行可成功解析的json格式数据,且数据中存在不可或缺字段vin、terminalTime
n 得到解析失败的数据,称为异常数据
u json格式不正确;缺少必要字段,导致数据格式不正确的原因:
网络原因,车辆行驶在信号差的地方,数据传输不完整;
数据传输过程中,数据会传TSP造成数据丢失;
数据采集终端设备故障,导致数据部分丢失
数据入库,正确数据入hive与hbase库各一份,错误数据入hive
n 先入hdfs,再关联hive表
/
* 需求:原始数据ETL操作
* flink消费kafka数据,将消费出来的数据进行转换、清洗、过滤以后,正常的数据需要写入到hbase和hdfs,异常的数据写入到hdfs中
* 1)正常数据写入hdfs和hbase
* 2)异常数据写入到hbase
*/
public class KafkaSourceDataTask {
/
* 入口方法
* @param args
*/
public static void main(String[] args) throws Exception {
/
* 实现步骤:
* 1)初始化flink流式处理的开发环境
* 2)设置按照事件时间处理数据(划分窗口或者添加水印都需要事件时间)
* 3)开启checkpoint
* 3.1:设置每隔30秒钟开启checkpoint
* 3.2:设置检查点的model,exactly-once,保证数据消费一次,数据不重复消费
* 3.3:设置两次checkpoint时间间隔,避免两次间隔太近导致频繁checkpoint而出现业务处理能力下降
* 3.4:设置checkpoint的超时时间
* 3.5:设置checkpoint最大的尝试次数,同一个时间有几个checkpoint并行执行
* 3.6:设置checkpoint取消的时候,是否保留checkpoint,checkpoint默认会在job取消的时候删除
* 3.7:设置执行job过程中,保存检查点错误时,job不失败
* 3.8:设置检查点存储的位置,使用rocksDBStateBackend,存储到本地+hdfs分布式文件,增量检查点
* 4)设置任务的重启策略(固定延迟重启策略、失败率重启策略、无重启策略)
* 5)创建flink消费kafka数据的对象,指定kafka的参数信息
* 5.1:设置kafka集群地址
* 5.2:设置消费者组id
* 5.3:设置kafka的分区感知(动态监测)
* 5.4:设置key和value的反序列化
* 5.5:设置自动递交offset位置策略
* 5.6:创建kafka的消费者实例
* 5.7:设置自动递交offset到保存到检查点
* 6)将kafka消费者对象添加到环境中
* 7)将json字符串解析成对象
* 8)获取到异常的数据
* 9)获取到正常的数据
* 10)将异常的数据写入到hdfs中(StreamingFileSink、BucketingSink)
* StreamingFileSink是flink1.10的新特性
* 11)将正常的数据写入到hdfs中
* 12)将正常的数据写入到hbase中
* 13)启动作业,运行任务
*/
//加载conf.properties配置文件,返回ParameterTool工具类对象
ParameterTool parameterTool = ParameterTool.fromPropertiesFile(BaseTask.class.getClassLoader().getResourceAsStream("conf.properties"));
//TODO 1)初始化flink流式处理的开发环境
System.setProperty("HADOOP_USER_NAME", "root");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置全局的参数
env.getConfig().setGlobalJobParameters(parameterTool);
//TODO 2)设置按照事件时间处理数据(划分窗口或者添加水印都需要事件时间)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//TODO 3)开启checkpoint
//TODO 3.1:设置每隔30秒钟开启checkpoint
env.enableCheckpointing(30*1000);
//TODO 3.2:设置检查点的model,exactly-once,保证数据消费一次,数据不重复消费
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//TODO 3.3:设置两次checkpoint时间间隔,避免两次间隔太近导致频繁checkpoint而出现业务处理能力下降
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(20000);
//TODO 3.4:设置checkpoint的超时时间
env.getCheckpointConfig().setCheckpointTimeout(20000);
//TODO 3.5:设置checkpoint最大的尝试次数,同一个时间有几个checkpoint并行执行
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
//TODO 3.6:设置checkpoint取消的时候,是否保留checkpoint,checkpoint默认会在job取消的时候删除
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//TODO 3.7:设置执行job过程中,保存检查点错误时,job不失败
env.getCheckpointConfig().setFailOnCheckpointingErrors(false);
//TODO 3.8:设置检查点存储的位置,使用rocksDBStateBackend,存储到本地+hdfs分布式文件,增量检查点
String bashHdfsUri = parameterTool.getRequired("hdfsUri");
try {
env.setStateBackend(new RocksDBStateBackend(bashHdfsUri+"/flink/checkpoint/"+KafkaSourceDataTask.class.getSimpleName()));
} catch (IOException e) {
e.printStackTrace();
}
//TODO 4)设置任务的重启策略(固定延迟重启策略、失败率重启策略、无重启策略)
env.setRestartStrategy(RestartStrategies.noRestart());
//TODO 5)创建flink消费kafka数据的对象,指定kafka的参数信息
Properties props = new Properties();
//TODO 5.1:设置kafka集群地址
props.setProperty("bootstrap.servers", parameterTool.getRequired("bootstrap.servers"));
//TODO 5.2:设置消费者组id
props.setProperty("group.id", "KafkaSourceDataTask04");
//TODO 5.3:设置kafka的分区感知(动态监测)
props.setProperty("flink.partition-discovery.interval-millis", "30000");
//TODO 5.5:设置自动递交offset位置策略
props.setProperty("auto.offset.reset", parameterTool.get("auto.offset.reset", "earliest"));
//5.不自动提交偏移量,交给flink的checkpoint处理哦
props.setProperty("enable.auto.commit", parameterTool.get("enable.auto.commit", "false"));
//TODO 5.6:创建kafka的消费者实例
FlinkKafkaConsumer011<String> kafkaConsumer = new FlinkKafkaConsumer011<String>(
parameterTool.getRequired("kafka.topic"),
new SimpleStringSchema(), props
);
//TODO 5.7:设置自动递交offset保存到检查点
kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
//TODO 6)将kafka消费者对象添加到环境中
DataStream<String> dataStreamSource = env.addSource(kafkaConsumer);
//打印输出测试
dataStreamSource.print();
//TODO 13)启动作业,运行任务
env.execute();
}
}
2 测试实时ETL消费数据
启动zookeeper集群
zkServer.sh start
启动hadoop集群
启动kafka集群
| cd /export/servers/kafka
| nohup bin/kafka-server-start.sh config/server.properties 2>&1 &
启动kafka生产者命令行工具
bin/kafka-console-producer.sh --broker-list node01:9092 --topic vehicledata-dev
测试数据
1 封装flink流处理环境初始化工具类
为了实现代码复用,提高开发效率,创建flink流式处理环境的工具类
1 创建基础任务处理基类
创建任务处理基类:cn.itcast.streaming.taskBaseTask.java
2 测试实时ETL消费数据
调用flink工具类初始化task作业
/**
* 需求:原始数据ETL操作
* flink消费kafka数据,将消费出来的数据进行转换、清洗、过滤以后,正常的数据需要写入到hbase和hdfs,异常的数据写入到hdfs中
* 1)正常数据写入hdfs和hbase
* 2)异常数据写入到hbase
*/
public class KafkaSourceDataTask extends BaseTask {
/**
* 入口方法
* @param args
*/
public static void main(String[] args) throws Exception {
/**
* 实现步骤:
* 1)初始化flink流式处理的开发环境
* 2)设置按照事件时间处理数据(划分窗口或者添加水印都需要事件时间)
* 3)开启checkpoint
* 3.1:设置每隔30秒钟开启checkpoint
* 3.2:设置检查点的model,exactly-once,保证数据消费一次,数据不重复消费
* 3.3:设置两次checkpoint时间间隔,避免两次间隔太近导致频繁checkpoint而出现业务处理能力下降
* 3.4:设置checkpoint的超时时间
* 3.5:设置checkpoint最大的尝试次数,同一个时间有几个checkpoint并行执行
* 3.6:设置checkpoint取消的时候,是否保留checkpoint,checkpoint默认会在job取消的时候删除
* 3.7:设置执行job过程中,保存检查点错误时,job不失败
* 3.8:设置检查点存储的位置,使用rocksDBStateBackend,存储到本地+hdfs分布式文件,增量检查点
* 4)设置任务的重启策略(固定延迟重启策略、失败率重启策略、无重启策略)
* 5)创建flink消费kafka数据的对象,指定kafka的参数信息
* 5.1:设置kafka集群地址
* 5.2:设置消费者组id
* 5.3:设置kafka的分区感知(动态监测)
* 5.4:设置key和value的反序列化
* 5.5:设置自动递交offset位置策略
* 5.6:创建kafka的消费者实例
* 5.7:设置自动递交offset到保存到检查点
* 6)将kafka消费者对象添加到环境中
* 7)将json字符串解析成对象
* 8)获取到异常的数据
* 9)获取到正常的数据
* 10)将异常的数据写入到hdfs中(StreamingFileSink、BucketingSink)
* StreamingFileSink是flink1.10的新特性,而flink1.8.1版本,是没有这个功能的,因此只能BucketingSink
* 11)将正常的数据写入到hdfs中
* 12)将正常的数据写入到hbase中
* 13)启动作业,运行任务
*/
//TODO 1)初始化flink流式处理的开发环境
StreamExecutionEnvironment env = getEnv(KafkaSourceDataTask.class.getSimpleName());
//TODO 6)将kafka消费者对象添加到环境中
DataStream<String> dataStreamSource= createKafkaStream(SimpleStringSchema.class);
//打印输出测试
dataStreamSource.print();
//TODO 13)启动作业,运行任务
env.execute();
}
}
- 点赞
- 收藏
- 关注作者
评论(0)