Flink原理初探和流批一体API(二)v2

举报
Maynor学长 发表于 2022/07/22 21:55:53 2022/07/22
【摘要】 今日目标 流处理概念(理解) 程序结构之数据源Source(掌握) 程序结构之数据转换Transformation(掌握) 程序结构之数据落地Sink(掌握) Flink连接器Connectors(理解)

今日目标

  • 流处理概念(理解)
  • 程序结构之数据源Source(掌握)
  • 程序结构之数据转换Transformation(掌握)
  • 程序结构之数据落地Sink(掌握)
  • Flink连接器Connectors(理解)

### Kafka

+ 消费的起始位置

  ![image-20210505165659481](https://img-blog.csdnimg.cn/img_convert/37ebc82a6c9cf33f7f40c19a7a87f63e.png)

+ 消费者自动发现分区和topic

![image-20210505170020857](https://img-blog.csdnimg.cn/img_convert/e8200671840d31d499988327a3f9a1c9.png)

+ 设置FlinkKafkaConsumer 属性

  ![image-20210505173042799](https://img-blog.csdnimg.cn/img_convert/d67e6134801e36bd18e0a52683367a4e.png)

~~~java
package cn.itcast.sz22.day02;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Properties;

/**
 * Author itcast
 * Date 2021/5/5 17:23
 * 需求:使用flink-connector-kafka_2.12中的FlinkKafkaConsumer消费Kafka中的数据做WordCount
 * 需要设置如下参数:
 * 1.订阅的主题
 * 2.反序列化规则
 * 3.消费者属性-集群地址
 * 4.消费者属性-消费者组id(如果不设置,会有默认的,但是默认的不方便管理)
 * 5.消费者属性-offset重置规则,如earliest/latest...
 * 6.动态分区检测(当kafka的分区数变化/增加时,Flink能够检测到!)
 * 7.如果没有设置Checkpoint,那么可以设置自动提交offset,后续学习了Checkpoint会把offset随着做Checkpoint的时候提交到Checkpoint和默认主题中
 */
public class FlinkKafkaConsumerDemo {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //开启checkpoint
        env.enableCheckpointing(5000);
        //2.Source
        Properties props  = new Properties();
        props.setProperty("bootstrap.servers", "node1:9092");
        props.setProperty("group.id", "flink");
        props.setProperty("auto.offset.reset","latest");
        props.setProperty("flink.partition-discovery.interval-millis","5000");//会开启一个后台线程每隔5s检测一下Kafka的分区情况
        props.setProperty("enable.auto.commit", "true");
        props.setProperty("auto.commit.interval.ms", "2000");
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("flink_kafka"
                , new SimpleStringSchema(), props);
        consumer.setStartFromEarliest();
        DataStreamSource<String> source = env.addSource(consumer).setParallelism(1);
        source.print();
        env.execute();

    }
}
~~~

+ kafka 消费数据

  ~~~java
  package cn.itcast.sz22.day02;
  
  import org.apache.flink.api.common.serialization.SimpleStringSchema;
  import org.apache.flink.streaming.api.datastream.DataStreamSource;
  import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
  
  import java.util.Properties;
  
  /**
   * Author itcast
   * Date 2021/5/5 17:23
   * 需求:使用flink-connector-kafka_2.12中的FlinkKafkaConsumer消费Kafka中的数据做WordCount
   * 需要设置如下参数:
   * 1.订阅的主题
   * 2.反序列化规则
   * 3.消费者属性-集群地址
   * 4.消费者属性-消费者组id(如果不设置,会有默认的,但是默认的不方便管理)
   * 5.消费者属性-offset重置规则,如earliest/latest...
   * 6.动态分区检测(当kafka的分区数变化/增加时,Flink能够检测到!)
   * 7.如果没有设置Checkpoint,那么可以设置自动提交offset,后续学习了Checkpoint会把offset随着做Checkpoint的时候提交到Checkpoint和默认主题中
   */
  public class FlinkKafkaConsumerDemo {
      public static void main(String[] args) throws Exception {
          //1.env
          StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
          //开启checkpoint
          env.enableCheckpointing(5000);
          //2.Source
          Properties props  = new Properties();
          props.setProperty("bootstrap.servers", "node1:9092");
          props.setProperty("group.id", "flink");
          props.setProperty("auto.offset.reset","latest");
          props.setProperty("flink.partition-discovery.interval-millis","5000");//会开启一个后台线程每隔5s检测一下Kafka的分区情况
          props.setProperty("enable.auto.commit", "true");
          props.setProperty("auto.commit.interval.ms", "2000");
          FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("flink_kafka"
                  , new SimpleStringSchema(), props);
          consumer.setStartFromEarliest();
          DataStreamSource<String> source = env.addSource(consumer).setParallelism(1);
          source.print();
          env.execute();
  
      }
  }
  
  ~~~

  

### redis

+ Flink-Sink-Redis

+ 案例 - 统计保存到 redis 

  ~~~java
  package cn.itcast.sz22.day02;
  
  import org.apache.flink.api.common.typeinfo.Types;
  import org.apache.flink.api.java.tuple.Tuple2;
  import org.apache.flink.streaming.api.datastream.DataStream;
  import org.apache.flink.streaming.api.datastream.DataStreamSource;
  import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  import org.apache.flink.streaming.connectors.redis.RedisSink;
  import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
  import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
  import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
  import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
  import org.apache.flink.util.Collector;
  
  import java.util.Arrays;
  
  /**
   * Author itcast
   * Date 2021/5/5 18:03
   * Desc TODO
   */
  public class FlinkRedisSink {
      public static void main(String[] args) throws Exception {
          //1.env
          StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
          //2.Source 通过socket获取数据源
          DataStreamSource<String> source = env.socketTextStream("node1", 9999);
          //3.Transformation
          //3.1切割并记为1
          DataStream<String> faltMapDS = source.flatMap((String value, Collector<String> out) ->
                  Arrays.stream(value.split(" "))
                          .forEach(out::collect))
                  .returns(Types.STRING);
          //O map(T value)
          SingleOutputStreamOperator<Tuple2<String, Integer>> mapDS = faltMapDS
                  .map((word) -> Tuple2.of(word, 1))
                  .returns(Types.TUPLE(Types.STRING, Types.INT));
          SingleOutputStreamOperator<Tuple2<String, Integer>> result = mapDS.keyBy(t -> t.f0).sum(1);
          //4.Sink
          FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder()
                  .setHost("node1").build();
          result.addSink(new RedisSink<Tuple2<String, Integer>>(config, new RedisMapperEx()));
          env.execute();
          // * 最后将结果保存到Redis 实现 FlinkJedisPoolConfig
          // * 注意:存储到Redis的数据结构:使用hash也就是map
          // * key            value
          // * WordCount      (单词,数量)
          //-1.创建RedisSink之前需要创建RedisConfig
          //连接单机版Redis
  //5.execute
      }
      public static class RedisMapperEx implements RedisMapper<Tuple2<String, Integer>> {
          @Override
          public RedisCommandDescription getCommandDescription() {
              return new RedisCommandDescription(RedisCommand.HSET, "hashOpt");
          }
          @Override
          public String getKeyFromData(Tuple2<String, Integer> data) {
              return data.f0;
          }
          @Override
          public String getValueFromData(Tuple2<String, Integer> data) {
              return data.f1 + "";
          }
      }
  }
oolConfig config = new FlinkJedisPoolConfig.Builder()
                  .setHost("node1").build();
          result.addSink(new RedisSink<Tuple2<String, Integer>>(config, new RedisMapperEx()));
          env.execute();
          // * 最后将结果保存到Redis 实现 FlinkJedisPoolConfig
          // * 注意:存储到Redis的数据结构:使用hash也就是map
          // * key            value
          // * WordCount      (单词,数量)

          //-1.创建RedisSink之前需要创建RedisConfig
          //连接单机版Redis
  //5.execute
      }

      public static class RedisMapperEx implements RedisMapper<Tuple2<String, Integer>> {
          @Override
          public RedisCommandDescription getCommandDescription() {
              return new RedisCommandDescription(RedisCommand.HSET, "hashOpt");
          }

          @Override
          public String getKeyFromData(Tuple2<String, Integer> data) {
              return data.f0;
          }

          @Override
          public String getValueFromData(Tuple2<String, Integer> data) {
              return data.f1 + "";
          }
      }
  }
  ~~~

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。