2021年最新最全Flink系列教程_Flink原理初探和流批一体API(二.五)v2

举报
Maynor学长 发表于 2022/07/24 11:40:39 2022/07/24
【摘要】 Kafka 连接方式Kafka 是消息队列需求:通过 Flink 将数据元素写入(producer)到 Kafka 中~~~java  package cn.itcast.flink.sink;    import com.alibaba.fastjson.JSON;  import lombok.AllArgsConstructor;  import lombok.Data;  impo...

Kafka 连接方式

  • Kafka 是消息队列

  • 需求:

通过 Flink 将数据元素写入(producer)到 Kafka 中

~~~java
  package cn.itcast.flink.sink;
  
  import com.alibaba.fastjson.JSON;
  import lombok.AllArgsConstructor;
  import lombok.Data;
  import lombok.NoArgsConstructor;
  import org.apache.flink.api.common.functions.MapFunction;
  import org.apache.flink.api.common.serialization.SimpleStringSchema;
  import org.apache.flink.streaming.api.datastream.DataStreamSource;
  import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
  import org.apache.flink.streaming.connectors.kafka.internals.KafkaSerializationSchemaWrapper;
  import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
  import org.apache.kafka.clients.producer.ProducerConfig;
  
  import java.util.Properties;
  
  /**
   * Author itcast
   * Date 2021/6/17 16:46
   * 需求: 将数据元素封装成 JSON字符串 生产到 Kafka 中
   * 步骤:
   *
   */
  public class KafkaProducerDemo {
      public static void main(String[] args) throws Exception {
          //1.env
          StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
          //2.Source 生成一个元素 Student
          DataStreamSource<Student> studentDS = env.fromElements(new Student(102, “Oking”, 25));
          //3.Transformation
          //注意:目前来说我们使用Kafka使用的序列化和反序列化都是直接使用最简单的字符串,所以先将Student转为字符串
          //3.1 map 方法 将 Student转换成字符串
          SingleOutputStreamOperator<String> mapDS = studentDS.map(new MapFunction<Student, String>() {
              @Override
              public String map(Student value) throws Exception {
                  //可以直接调用JSON的toJsonString,也可以转为JSON
                  String json = JSON.toJSONString(value);
                  return json;
              }
          });
  
          //4.Sink
          Properties props = new Properties();
          props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, “192.168.88.161:9092”);
          //根据参数实例化 FlinkKafkaProducer
          //4.1如果不需要复杂的参数设置,只需要将数据存储到 kafka 消息队列中,使用第一个重载方法
          //  如果需要设置复杂的 kafka 的配置的时候, 使用除了第一个之外的重载方法
          //  如果需要设置仅一次语义 Semantic ,可以使用最后两个
          /FlinkKafkaProducer producer = new FlinkKafkaProducer(
                  “192.168.88.161:9092,192.168.88.162:9092,192.168.88.163:9092”,
                  “flink_kafka”,
                  new SimpleStringSchema()
          );
/
          FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<String>(
                  “flink_kafka”,
                  new KafkaSerializationSchemaWrapper(
                          “flink_kafka”,
                          new FlinkFixedPartitioner(),
                          false,
                          new SimpleStringSchema()
                  ),
                  props,
                  //支持仅一次语义的方式进行提交数据
                  FlinkKafkaProducer.Semantic.EXACTLY_ONCE
          );
  
          mapDS.addSink(producer);
          // ds.addSink 落地到kafka集群中
          //5.execute
          env.execute();
          //测试 /export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic flink_kafka
      }
  
      @Data
      @NoArgsConstructor
      @AllArgsConstructor
      public static class Student {
          private Integer id;
          private String name;
          private Integer age;
      }
  }
  
  ~~~

从 kafka 集群中消费数据

  • 需求

读取 kafka 中的数据到控制台

  • 开发步骤

~~~java
  /**
   * Author itcast
   * Date 2021/6/17 16:46
   * 需求: 将数据元素封装成 JSON字符串 生产到 Kafka 中
   * 步骤:
   *
   */
  public class KafkaProducerDemo {
      public static void main(String[] args) throws Exception {
          //1.env
          StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
          //2.Source 生成一个元素 Student
          DataStreamSource<Student> studentDS = env.fromElements(new Student(104, “chaoxian”, 25));
          //3.Transformation
          //注意:目前来说我们使用Kafka使用的序列化和反序列化都是直接使用最简单的字符串,所以先将Student转为字符串
          //3.1 map 方法 将 Student转换成字符串
          SingleOutputStreamOperator<String> mapDS = studentDS.map(new MapFunction<Student, String>() {
              @Override
              public String map(Student value) throws Exception {
                  //可以直接调用JSON的toJsonString,也可以转为JSON
                  String json = JSON.toJSONString(value);
                  return json;
              }
          });
  
          //4.Sink
          Properties props = new Properties();
          props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, “192.168.88.161:9092”);
          //根据参数实例化 FlinkKafkaProducer
          //4.1如果不需要复杂的参数设置,只需要将数据存储到 kafka 消息队列中,使用第一个重载方法
          //  如果需要设置复杂的 kafka 的配置的时候, 使用除了第一个之外的重载方法
          //  如果需要设置仅一次语义 Semantic ,可以使用最后两个
          /FlinkKafkaProducer producer = new FlinkKafkaProducer(
                  “192.168.88.161:9092,192.168.88.162:9092,192.168.88.163:9092”,
                  “flink_kafka”,
                  new SimpleStringSchema()
          );
/
          FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<String>(
                  “flink_kafka”,
                  new KafkaSerializationSchemaWrapper(
                          “flink_kafka”,
                          new FlinkFixedPartitioner(),
                          false,
                          new SimpleStringSchema()
                  ),
                  props,
                  //支持仅一次语义的方式进行提交数据
                  FlinkKafkaProducer.Semantic.EXACTLY_ONCE
          );
  
          mapDS.addSink(producer);
          // ds.addSink 落地到kafka集群中
          //5.execute
          env.execute();
          //测试 /export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic flink_kafka
      }
  
      @Data
      @NoArgsConstructor
      @AllArgsConstructor
      public static class Student {
          private Integer id;
          private String name;
          private Integer age;
      }
  }
  ~~~

Flink写入到 Redis 数据库

  • Redis 是支持缓存的内存数据库,支持持久化

  • 使用场景
      1. 热数据处理 , 缓存机制
      2. 去重
      3. 五种数据类型 String Hash set Zset List

  • 需求:

通过 Flink 将数据写入到 Redis 中

~~~java
  package cn.itcast.flink.sink;
  
  import org.apache.flink.api.common.functions.FlatMapFunction;
  import org.apache.flink.api.java.tuple.Tuple;
  import org.apache.flink.api.java.tuple.Tuple2;
  import org.apache.flink.streaming.api.datastream.DataStream;
  import org.apache.flink.streaming.api.datastream.KeyedStream;
  import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  import org.apache.flink.streaming.connectors.redis.RedisSink;
  import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
  import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
  import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
  import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
  import org.apache.flink.util.Collector;
  
  /**
   * Author itcast
   * Desc
   * 需求:
   * 接收消息并做WordCount,
   * 最后将结果保存到Redis
   * 注意:存储到Redis的数据结构:使用hash也就是map
   * key            value
   * WordCount    (单词,数量)
   /
  public class ConnectorsDemo_Redis {
      public static void main(String[] args) throws Exception {
          //1.env 执行环境
          StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
          //2.Source 从 socket 中读取数据
          DataStream<String> linesDS = env.socketTextStream(“192.168.88.163”, 9999);
  
          //3.Transformation
          //3.1切割并记为1
          SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOneDS = linesDS
                  .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
              @Override
              public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                  String[] words = value.split(" ");
                  for (String word : words) {
                      out.collect(Tuple2.of(word, 1));
                  }
              }
          });
          //3.2分组
          KeyedStream<Tuple2<String, Integer>, String> groupedDS = wordAndOneDS.keyBy(t -> t.f0);
          //3.3聚合
          SingleOutputStreamOperator<Tuple2<String, Integer>> result = groupedDS.sum(1);
  
          //4.Sink
          result.print();
          // * 最后将结果保存到Redis
          // * 注意:存储到Redis的数据结构:使用hash也就是map
          // * key            value
          // * WordCount      (单词,数量)
  
          //-1.创建RedisSink之前需要创建RedisConfig
          //连接单机版Redis
          FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
                  .setHost(“192.168.88.163”)
                  .setDatabase(2)
                  .build();
          //-3.创建并使用RedisSink
          result.addSink(new RedisSink<Tuple2<String, Integer>>(conf, new RedisWordCountMapper()));
  
          //5.execute
          env.execute();
      }
  
      /
*
       * -2.定义一个Mapper用来指定存储到Redis中的数据结构
       */
      public static class RedisWordCountMapper implements RedisMapper<Tuple2<String, Integer>> {
          @Override
          public RedisCommandDescription getCommandDescription() {
              //使用哪一种数据类型, key:WordCount
              return new RedisCommandDescription(RedisCommand.HSET, “WordCount”);
          }
          @Override
          public String getKeyFromData(Tuple2<String, Integer> data) {
              // 存储数据的 key
              return data.f0;
          }
          @Override
          public String getValueFromData(Tuple2<String, Integer> data) {
              // 存储数据的 value
              return data.f1.toString();
          }
      }
  }
  ~~~

问题

  • vmware 打开镜像文件 15.5.x 升级为 16.1.0 , 可以升级为

  • fromSequece(1,10) , CPU 12线程, from <= to

设置的并行度大于生成的数据, 并行度为12, 生成数据只有 10 个,报这个。

  • Flink Standalone HA 高可用

jobmanager -> log

image-20210617145345934

总结

以上便是2021年最新最全Flink系列教程_Flink原理初探和流批一体API(二.五)

愿你读过之后有自己的收获,如果有收获不妨一键三连一下~
在这里插入图片描述

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。