2021年最新最全Flink系列教程_Flink原理初探和流批一体API(二.五)v2
Kafka 连接方式
-
Kafka 是消息队列
-
需求:
通过 Flink 将数据元素写入(producer)到 Kafka 中
~~~java
package cn.itcast.flink.sink;
import com.alibaba.fastjson.JSON;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaSerializationSchemaWrapper;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
import org.apache.kafka.clients.producer.ProducerConfig;
import java.util.Properties;
/**
* Author itcast
* Date 2021/6/17 16:46
* 需求: 将数据元素封装成 JSON字符串 生产到 Kafka 中
* 步骤:
*
*/
public class KafkaProducerDemo {
public static void main(String[] args) throws Exception {
//1.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.Source 生成一个元素 Student
DataStreamSource<Student> studentDS = env.fromElements(new Student(102, “Oking”, 25));
//3.Transformation
//注意:目前来说我们使用Kafka使用的序列化和反序列化都是直接使用最简单的字符串,所以先将Student转为字符串
//3.1 map 方法 将 Student转换成字符串
SingleOutputStreamOperator<String> mapDS = studentDS.map(new MapFunction<Student, String>() {
@Override
public String map(Student value) throws Exception {
//可以直接调用JSON的toJsonString,也可以转为JSON
String json = JSON.toJSONString(value);
return json;
}
});
//4.Sink
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, “192.168.88.161:9092”);
//根据参数实例化 FlinkKafkaProducer
//4.1如果不需要复杂的参数设置,只需要将数据存储到 kafka 消息队列中,使用第一个重载方法
// 如果需要设置复杂的 kafka 的配置的时候, 使用除了第一个之外的重载方法
// 如果需要设置仅一次语义 Semantic ,可以使用最后两个
/FlinkKafkaProducer producer = new FlinkKafkaProducer(
“192.168.88.161:9092,192.168.88.162:9092,192.168.88.163:9092”,
“flink_kafka”,
new SimpleStringSchema()
);/
FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<String>(
“flink_kafka”,
new KafkaSerializationSchemaWrapper(
“flink_kafka”,
new FlinkFixedPartitioner(),
false,
new SimpleStringSchema()
),
props,
//支持仅一次语义的方式进行提交数据
FlinkKafkaProducer.Semantic.EXACTLY_ONCE
);
mapDS.addSink(producer);
// ds.addSink 落地到kafka集群中
//5.execute
env.execute();
//测试 /export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic flink_kafka
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class Student {
private Integer id;
private String name;
private Integer age;
}
}
~~~
从 kafka 集群中消费数据
- 需求
读取 kafka 中的数据到控制台
- 开发步骤
~~~java
/**
* Author itcast
* Date 2021/6/17 16:46
* 需求: 将数据元素封装成 JSON字符串 生产到 Kafka 中
* 步骤:
*
*/
public class KafkaProducerDemo {
public static void main(String[] args) throws Exception {
//1.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.Source 生成一个元素 Student
DataStreamSource<Student> studentDS = env.fromElements(new Student(104, “chaoxian”, 25));
//3.Transformation
//注意:目前来说我们使用Kafka使用的序列化和反序列化都是直接使用最简单的字符串,所以先将Student转为字符串
//3.1 map 方法 将 Student转换成字符串
SingleOutputStreamOperator<String> mapDS = studentDS.map(new MapFunction<Student, String>() {
@Override
public String map(Student value) throws Exception {
//可以直接调用JSON的toJsonString,也可以转为JSON
String json = JSON.toJSONString(value);
return json;
}
});
//4.Sink
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, “192.168.88.161:9092”);
//根据参数实例化 FlinkKafkaProducer
//4.1如果不需要复杂的参数设置,只需要将数据存储到 kafka 消息队列中,使用第一个重载方法
// 如果需要设置复杂的 kafka 的配置的时候, 使用除了第一个之外的重载方法
// 如果需要设置仅一次语义 Semantic ,可以使用最后两个
/FlinkKafkaProducer producer = new FlinkKafkaProducer(
“192.168.88.161:9092,192.168.88.162:9092,192.168.88.163:9092”,
“flink_kafka”,
new SimpleStringSchema()
);/
FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<String>(
“flink_kafka”,
new KafkaSerializationSchemaWrapper(
“flink_kafka”,
new FlinkFixedPartitioner(),
false,
new SimpleStringSchema()
),
props,
//支持仅一次语义的方式进行提交数据
FlinkKafkaProducer.Semantic.EXACTLY_ONCE
);
mapDS.addSink(producer);
// ds.addSink 落地到kafka集群中
//5.execute
env.execute();
//测试 /export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic flink_kafka
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class Student {
private Integer id;
private String name;
private Integer age;
}
}
~~~
Flink写入到 Redis 数据库
-
Redis 是支持缓存的内存数据库,支持持久化
-
使用场景
1. 热数据处理 , 缓存机制
2. 去重
3. 五种数据类型 String Hash set Zset List -
需求:
通过 Flink 将数据写入到 Redis 中
~~~java
package cn.itcast.flink.sink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.apache.flink.util.Collector;
/**
* Author itcast
* Desc
* 需求:
* 接收消息并做WordCount,
* 最后将结果保存到Redis
* 注意:存储到Redis的数据结构:使用hash也就是map
* key value
* WordCount (单词,数量)
/
public class ConnectorsDemo_Redis {
public static void main(String[] args) throws Exception {
//1.env 执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.Source 从 socket 中读取数据
DataStream<String> linesDS = env.socketTextStream(“192.168.88.163”, 9999);
//3.Transformation
//3.1切割并记为1
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOneDS = linesDS
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] words = value.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word, 1));
}
}
});
//3.2分组
KeyedStream<Tuple2<String, Integer>, String> groupedDS = wordAndOneDS.keyBy(t -> t.f0);
//3.3聚合
SingleOutputStreamOperator<Tuple2<String, Integer>> result = groupedDS.sum(1);
//4.Sink
result.print();
// * 最后将结果保存到Redis
// * 注意:存储到Redis的数据结构:使用hash也就是map
// * key value
// * WordCount (单词,数量)
//-1.创建RedisSink之前需要创建RedisConfig
//连接单机版Redis
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
.setHost(“192.168.88.163”)
.setDatabase(2)
.build();
//-3.创建并使用RedisSink
result.addSink(new RedisSink<Tuple2<String, Integer>>(conf, new RedisWordCountMapper()));
//5.execute
env.execute();
}
/*
* -2.定义一个Mapper用来指定存储到Redis中的数据结构
*/
public static class RedisWordCountMapper implements RedisMapper<Tuple2<String, Integer>> {
@Override
public RedisCommandDescription getCommandDescription() {
//使用哪一种数据类型, key:WordCount
return new RedisCommandDescription(RedisCommand.HSET, “WordCount”);
}
@Override
public String getKeyFromData(Tuple2<String, Integer> data) {
// 存储数据的 key
return data.f0;
}
@Override
public String getValueFromData(Tuple2<String, Integer> data) {
// 存储数据的 value
return data.f1.toString();
}
}
}
~~~
问题
-
vmware 打开镜像文件 15.5.x 升级为 16.1.0 , 可以升级为
-
fromSequece(1,10) , CPU 12线程, from <= to
设置的并行度大于生成的数据, 并行度为12, 生成数据只有 10 个,报这个。
- Flink Standalone HA 高可用
jobmanager -> log
总结
以上便是2021年最新最全Flink系列教程_Flink原理初探和流批一体API(二.五)
愿你读过之后有自己的收获,如果有收获不妨一键三连一下~
- 点赞
- 收藏
- 关注作者
评论(0)