
Maynor学长 发表于 2022/07/24 12:02:49 2022/07/24
1.1k+ 0 0
【摘要】 FlinkSQL 整合 HiveFlinkSQL 整合 Hive 数据仓库Flink1.9 之后支持 Hive 数据仓库, 在Flink1.12版本支持 Hive 可以在生产级环境使用Hive 使用Hive,外部调用的使用,开启两个服务hive --service metastorehive --service hiveserver2FlinkSQL 如何整合 Hive在环境变量中配置 H...

FlinkSQL 整合 Hive

  • FlinkSQL 整合 Hive 数据仓库

  • Flink1.9 之后支持 Hive 数据仓库, 在Flink1.12版本支持 Hive 可以在生产级环境使用

  • Hive 使用Hive,外部调用的使用,开启两个服务

    hive --service metastore

    hive --service hiveserver2

  • FlinkSQL 如何整合 Hive

    1. 在环境变量中配置 HADOOP_CLASSPATH=hadoop classpath

      vim /etc/profile

      source /etc/profile

    2. 上传 flink 依赖的 Hive jar包上传到 Flink/lib 目录下

      [root@node3 lib]# wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hive-2.2.0_2.11/1.12.3/flink-sql-connector-hive-2.2.0_2.11-1.12.3.jar
    3. 配置文件

      ① Hive/conf/hive-site.xml - 单节点 node3

      vim /export/server/hive/conf/hive-site.xml

      ② flink/conf/flink-conf.yaml - 三台节点 scp到三台节点上

    4. 开启 hive 的服务

      hive --service metastore

  • FLink shell 操作

    • Flink 启动 sql-client 的基础配置
    vim /export/server/flink/conf/sql-client-defaults.yaml
       - name: myhive
         type: hive
         hive-conf-dir: /export/server/hive/conf
         default-database: default
    • FlinkSQL 操作 Hive
    show catalogs;
    # myhive
    show catalog myhive
    # 列出hive中所有数据库
    show databases;
    # 使用指定的数据库 bigdata
    use bigdata;
    # 查询当前数据库中所有的表
    show tables;
    # 查询指定表信息
    desc person;
    select * from person;
  • Flink Java API 操作

    import org.apache.flink.table.api.EnvironmentSettings;
    import org.apache.flink.table.api.TableEnvironment;
    import org.apache.flink.table.api.TableResult;
    import org.apache.flink.table.catalog.hive.HiveCatalog;
    public class HiveDemo {
        public static void main(String[] args){
            EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();
            TableEnvironment tableEnv = TableEnvironment.create(settings);
            String name            = "myhive";
            String defaultDatabase = "bigdata";
            String hiveConfDir = "./conf";
            //创建 Hive的catalog sql-client
            HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
            //注册catalog show catalogs 在sql-client-default.xml
            tableEnv.registerCatalog("myhive", hive);
            //使用注册的catalog  use catalog myhive
            String insertSQL = "insert into person values (6,'zhaoliu',30)";
            TableResult result = tableEnv.executeSql(insertSQL);
            //查看执行 Job 状态


  • 需求


  • 分析流程


  • 开发步骤

    package cn.itcast.flink;
    import org.apache.flink.api.common.state.MapState;
    import org.apache.flink.api.common.state.MapStateDescriptor;
    import org.apache.flink.api.java.tuple.Tuple3;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
    import org.apache.flink.streaming.api.functions.source.SourceFunction;
    import org.apache.flink.util.Collector;
    import java.util.Iterator;
    import java.util.Map;
    import java.util.Random;
    import java.util.UUID;
     * Author itcast
     * Date 2021/6/25 9:45
     * 开发步骤:
     * 1. 创建流执行环境 , 设置并行度
     * 2. 读取数据源, <userId,orderId,createTime> 数据源
     * 3. 转换操作 将数据源.keyBy(userId).process
     *  获取 MapState<orderId,createTime> 中间结果状态 state
     *  将当前的订单的数据信息进行处理 开启一个触发器
     *  onTimer 执行这个触发器,mapState 移除掉
    public class OrderAutoFarorableComment {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            /// Tuple3<用户id,订单id,订单完成时间>
            DataStreamSource<Tuple3<String, String, Long>> source = env.addSource(new MySource());
            //设置假如 interval=5秒,若用户未对订单做出评价,自动给与好评.
            //根据用户id进行分组,然后处理 TimerProcessFunction:KeyedProcessFunction
            .process(new TimerProcessFunction(5000L))
         * 自定义source实时产生订单数据Tuple3<用户id,订单id, 订单完成时间>
        public static class MySource implements SourceFunction<Tuple3<String, String, Long>> {
            private boolean flag = true;
            Random random = new Random();
            public void run(SourceContext<Tuple3<String, String, Long>> ctx) throws Exception {
                while (flag) {
                    String userId = random.nextInt(5) + "";
                    String orderId = UUID.randomUUID().toString();
                    long currentTimeMillis = System.currentTimeMillis();
                    ctx.collect(Tuple3.of(userId, orderId, currentTimeMillis));
            public void cancel() {
                flag = false;
         *  自定义处理函数用来给超时订单做自动好评!
         *  如一个订单进来:<订单id, 2020-10-10 12:00:00>
         *  那么该订单应该在12:00:00 + 5s 的时候超时!
         *  所以我们可以在订单进来的时候设置一个定时器,在订单时间 + interval的时候触发!
         *  KeyedProcessFunction<K, I, O>
         *  KeyedProcessFunction<String, Tuple3<用户id, 订单id, 订单生成时间>, Object>
         * @param <K> Type of the key. String
         * @param <I> Type of the input elements.  Tuple3<用户id,订单id, 订单完成时间>
         * @param <O> Type of the output elements.
        private static class TimerProcessFunction extends KeyedProcessFunction<String,Tuple3<String,String,Long>,Object> {
            MapState<String, Long> mapState;
            Long interval = 0L;
            public TimerProcessFunction(Long _interval){
                interval = _interval;
            //3.2在open 方法中获取 MapState
            public void open(Configuration parameters) throws Exception {
                MapStateDescriptor<String, Long> mapStateDesc = new MapStateDescriptor<>("mapState", String.class, Long.class);
                //从当前上下文获取到 mapstate
                mapState = getRuntimeContext().getMapState(mapStateDesc);
            public void processElement(Tuple3<String, String, Long> value, Context ctx, Collector<Object> out) throws Exception {
                String orderId = value.f1;
                Long orderFinish = value.f2;
            public void onTimer(long timestamp, OnTimerContext ctx, Collector<Object> out) throws Exception {
                //我们这里没有接口调用,直接模拟查询订单,没有评价才给默认好评!并直接输出提示! 已经评价了,直接输出提示!
                Iterator<Map.Entry<String, Long>> iterator = mapState.iterator();
                    Map.Entry<String, Long> next = iterator.next();
                    //读出来订单id, 如果当前的这个订单id ,将数据库中的指定的 comment 这个字段将 null -> 五星好评
                    String orderId = next.getKey();
                    //正式环境 通过一个接口,传入 订单id -> 执行结果  updateByOrderId(orderId) =>
                    // update t_order_comment set comment='5' WHERE orderId = $'orderId'
                    boolean result = isEvaluation(orderId);
                    if (result) {//已评价
                        System.out.println("订单(orderid: " + orderId + ")在" + interval + "毫秒时间内已经评价,不做处理");
                    } else {//未评价
                        System.out.println("订单(orderid: " + orderId + ")在" + interval + "毫秒时间内未评价,系统自动给了默认好评!");
                    //已经被处理过的订单,要从 mapstate 中移除掉
            //模拟给 orderId 随机给是否已经点评,如果被2整除,已经点评过了,否则还没有点评
            private boolean isEvaluation(String key) {
                return key.hashCode() % 2 == 0;//随机返回订单是否已评价


  • Streaming File sink 落地到 HDFS 上, 无法正常写入到 HDFS
    1. 导入依赖,确定是否有问题
    2. 确定 hdfs 服务启动

//模拟给 orderId 随机给是否已经点评,如果被2整除,已经点评过了,否则还没有点评
private boolean isEvaluation(String key) {
return key.hashCode() % 2 == 0;//随机返回订单是否已评价

## 问题

+ Streaming File sink 落地到 HDFS 上, 无法正常写入到 HDFS
1. 导入依赖,确定是否有问题
2. 确定 hdfs 服务启动
【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者








