实时即未来,大数据项目车联网之实时ETL任务消费数据

举报
Maynor学长 发表于 2022/10/20 14:18:52 2022/10/20
【摘要】 1 实时ETL任务消费数据 1 消费数据解析逻辑创建流式环境,设置检查点、kafka分区发现、任务重启策略,数据积压根据kafka属性配置创建FlinkKafkaConsumer,消费kafka数据根据kafka数据获得DataStream,进行json解析消费数据中的数据解析逻辑:n 得到解析成功的数据,称为正确数据u 原始文本为一行可成功解析的json格式数据,且数据中存在不可或缺...

1 实时ETL任务消费数据

1 消费数据解析逻辑

创建流式环境,设置检查点、kafka分区发现、任务重启策略,数据积压

根据kafka属性配置创建FlinkKafkaConsumer,消费kafka数据

根据kafka数据获得DataStream,进行json解析

消费数据中的数据解析逻辑:

n 得到解析成功的数据,称为正确数据

u 原始文本为一行可成功解析的json格式数据,且数据中存在不可或缺字段vin、terminalTime

n 得到解析失败的数据,称为异常数据

u json格式不正确;缺少必要字段,导致数据格式不正确的原因:

网络原因,车辆行驶在信号差的地方,数据传输不完整;

数据传输过程中,数据会传TSP造成数据丢失;

数据采集终端设备故障,导致数据部分丢失

数据入库,正确数据入hive与hbase库各一份,错误数据入hive

n 先入hdfs,再关联hive表

/
 * 需求:原始数据ETL操作
 * flink消费kafka数据,将消费出来的数据进行转换、清洗、过滤以后,正常的数据需要写入到hbase和hdfs,异常的数据写入到hdfs中
 * 1)正常数据写入hdfs和hbase
 * 2)异常数据写入到hbase
 */
public class KafkaSourceDataTask {
     /
     * 入口方法
     * @param args
     */
    public static void main(String[] args) throws Exception {
        /
         * 实现步骤:
         * 1)初始化flink流式处理的开发环境
         * 2)设置按照事件时间处理数据(划分窗口或者添加水印都需要事件时间)
         * 3)开启checkpoint
         *      3.1:设置每隔30秒钟开启checkpoint
         *      3.2:设置检查点的model,exactly-once,保证数据消费一次,数据不重复消费
         *      3.3:设置两次checkpoint时间间隔,避免两次间隔太近导致频繁checkpoint而出现业务处理能力下降
         *      3.4:设置checkpoint的超时时间
         *      3.5:设置checkpoint最大的尝试次数,同一个时间有几个checkpoint并行执行
         *      3.6:设置checkpoint取消的时候,是否保留checkpoint,checkpoint默认会在job取消的时候删除
         *      3.7:设置执行job过程中,保存检查点错误时,job不失败
         *      3.8:设置检查点存储的位置,使用rocksDBStateBackend,存储到本地+hdfs分布式文件,增量检查点
         * 4)设置任务的重启策略(固定延迟重启策略、失败率重启策略、无重启策略)
         * 5)创建flink消费kafka数据的对象,指定kafka的参数信息
         *      5.1:设置kafka集群地址
         *      5.2:设置消费者组id
         *      5.3:设置kafka的分区感知(动态监测)
         *      5.4:设置key和value的反序列化
         *      5.5:设置自动递交offset位置策略
         *      5.6:创建kafka的消费者实例
         *      5.7:设置自动递交offset到保存到检查点
         * 6)将kafka消费者对象添加到环境中
         * 7)将json字符串解析成对象
         * 8)获取到异常的数据
         * 9)获取到正常的数据
         * 10)将异常的数据写入到hdfs中(StreamingFileSinkBucketingSink*      StreamingFileSink是flink1.10的新特性
         * 11)将正常的数据写入到hdfs中
         * 12)将正常的数据写入到hbase中
         * 13)启动作业,运行任务
         */
        //加载conf.properties配置文件,返回ParameterTool工具类对象
        ParameterTool parameterTool = ParameterTool.fromPropertiesFile(BaseTask.class.getClassLoader().getResourceAsStream("conf.properties"));

        //TODO 1)初始化flink流式处理的开发环境
        System.setProperty("HADOOP_USER_NAME", "root");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置全局的参数
        env.getConfig().setGlobalJobParameters(parameterTool);
        //TODO 2)设置按照事件时间处理数据(划分窗口或者添加水印都需要事件时间)
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //TODO 3)开启checkpoint
        //TODO 3.1:设置每隔30秒钟开启checkpoint
        env.enableCheckpointing(30*1000);
        //TODO 3.2:设置检查点的model,exactly-once,保证数据消费一次,数据不重复消费
       env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        //TODO 3.3:设置两次checkpoint时间间隔,避免两次间隔太近导致频繁checkpoint而出现业务处理能力下降
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(20000);
        //TODO 3.4:设置checkpoint的超时时间
        env.getCheckpointConfig().setCheckpointTimeout(20000);
        //TODO 3.5:设置checkpoint最大的尝试次数,同一个时间有几个checkpoint并行执行
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        //TODO 3.6:设置checkpoint取消的时候,是否保留checkpoint,checkpoint默认会在job取消的时候删除
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        //TODO 3.7:设置执行job过程中,保存检查点错误时,job不失败
        env.getCheckpointConfig().setFailOnCheckpointingErrors(false);
        //TODO 3.8:设置检查点存储的位置,使用rocksDBStateBackend,存储到本地+hdfs分布式文件,增量检查点
        String bashHdfsUri = parameterTool.getRequired("hdfsUri");
        try {
            env.setStateBackend(new RocksDBStateBackend(bashHdfsUri+"/flink/checkpoint/"+KafkaSourceDataTask.class.getSimpleName()));
        } catch (IOException e) {
            e.printStackTrace();
        }
        //TODO 4)设置任务的重启策略(固定延迟重启策略、失败率重启策略、无重启策略)
        env.setRestartStrategy(RestartStrategies.noRestart());

        //TODO 5)创建flink消费kafka数据的对象,指定kafka的参数信息
        Properties props = new Properties();
        //TODO     5.1:设置kafka集群地址
        props.setProperty("bootstrap.servers", parameterTool.getRequired("bootstrap.servers"));
        //TODO     5.2:设置消费者组id
        props.setProperty("group.id", "KafkaSourceDataTask04");
        //TODO     5.3:设置kafka的分区感知(动态监测)
        props.setProperty("flink.partition-discovery.interval-millis", "30000");
        //TODO     5.5:设置自动递交offset位置策略
        props.setProperty("auto.offset.reset", parameterTool.get("auto.offset.reset", "earliest"));
        //5.不自动提交偏移量,交给flink的checkpoint处理哦
        props.setProperty("enable.auto.commit", parameterTool.get("enable.auto.commit", "false"));
        //TODO     5.6:创建kafka的消费者实例
        FlinkKafkaConsumer011<String> kafkaConsumer = new FlinkKafkaConsumer011<String>(
                parameterTool.getRequired("kafka.topic"),
                new SimpleStringSchema(), props
        );
        //TODO     5.7:设置自动递交offset保存到检查点
        kafkaConsumer.setCommitOffsetsOnCheckpoints(true);

        //TODO 6)将kafka消费者对象添加到环境中
        DataStream<String> dataStreamSource = env.addSource(kafkaConsumer);

        //打印输出测试
        dataStreamSource.print();

        //TODO 13)启动作业,运行任务
        env.execute();
    }
}

2 测试实时ETL消费数据

启动zookeeper集群

zkServer.sh start

启动hadoop集群

start-dfs.sh

启动kafka集群

| cd /export/servers/kafka
| nohup bin/kafka-server-start.sh config/server.properties 2>&1 &

启动kafka生产者命令行工具

bin/kafka-console-producer.sh --broker-list node01:9092 --topic vehicledata-dev

测试数据

image-20221009171056308

image-20221009171101876

1 封装flink流处理环境初始化工具类

为了实现代码复用,提高开发效率,创建flink流式处理环境的工具类

1 创建基础任务处理基类

创建任务处理基类:cn.itcast.streaming.taskBaseTask.java

2 测试实时ETL消费数据

调用flink工具类初始化task作业

img

img

/**
 * 需求:原始数据ETL操作
 * flink消费kafka数据,将消费出来的数据进行转换、清洗、过滤以后,正常的数据需要写入到hbase和hdfs,异常的数据写入到hdfs中
 * 1)正常数据写入hdfs和hbase
 * 2)异常数据写入到hbase
 */
public class KafkaSourceDataTask extends BaseTask {
     /**
     * 入口方法
     * @param args
     */
    public static void main(String[] args) throws Exception {
        /**
         * 实现步骤:
         * 1)初始化flink流式处理的开发环境
         * 2)设置按照事件时间处理数据(划分窗口或者添加水印都需要事件时间)
         * 3)开启checkpoint
         *      3.1:设置每隔30秒钟开启checkpoint
         *      3.2:设置检查点的model,exactly-once,保证数据消费一次,数据不重复消费
         *      3.3:设置两次checkpoint时间间隔,避免两次间隔太近导致频繁checkpoint而出现业务处理能力下降
         *      3.4:设置checkpoint的超时时间
         *      3.5:设置checkpoint最大的尝试次数,同一个时间有几个checkpoint并行执行
         *      3.6:设置checkpoint取消的时候,是否保留checkpoint,checkpoint默认会在job取消的时候删除
         *      3.7:设置执行job过程中,保存检查点错误时,job不失败
         *      3.8:设置检查点存储的位置,使用rocksDBStateBackend,存储到本地+hdfs分布式文件,增量检查点
         * 4)设置任务的重启策略(固定延迟重启策略、失败率重启策略、无重启策略)
         * 5)创建flink消费kafka数据的对象,指定kafka的参数信息
         *      5.1:设置kafka集群地址
         *      5.2:设置消费者组id
         *      5.3:设置kafka的分区感知(动态监测)
         *      5.4:设置key和value的反序列化
         *      5.5:设置自动递交offset位置策略
         *      5.6:创建kafka的消费者实例
         *      5.7:设置自动递交offset到保存到检查点
         * 6)将kafka消费者对象添加到环境中
         * 7)将json字符串解析成对象
         * 8)获取到异常的数据
         * 9)获取到正常的数据
         * 10)将异常的数据写入到hdfs中(StreamingFileSink、BucketingSink)
         *      StreamingFileSink是flink1.10的新特性,而flink1.8.1版本,是没有这个功能的,因此只能BucketingSink
         * 11)将正常的数据写入到hdfs中
         * 12)将正常的数据写入到hbase中
         * 13)启动作业,运行任务
         */

        //TODO 1)初始化flink流式处理的开发环境
        StreamExecutionEnvironment env = getEnv(KafkaSourceDataTask.class.getSimpleName());

        //TODO 6)将kafka消费者对象添加到环境中
        DataStream<String> dataStreamSource= createKafkaStream(SimpleStringSchema.class);
        //打印输出测试
        dataStreamSource.print();

        //TODO 13)启动作业,运行任务
        env.execute();
    }
}
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。