Flink原理初探和流批一体API(二)v2
【摘要】 今日目标
流处理概念(理解)
程序结构之数据源Source(掌握)
程序结构之数据转换Transformation(掌握)
程序结构之数据落地Sink(掌握)
Flink连接器Connectors(理解)
今日目标
- 流处理概念(理解)
- 程序结构之数据源Source(掌握)
- 程序结构之数据转换Transformation(掌握)
- 程序结构之数据落地Sink(掌握)
- Flink连接器Connectors(理解)
### Kafka
+ 消费的起始位置

+ 消费者自动发现分区和topic

+ 设置FlinkKafkaConsumer 属性

~~~java
package cn.itcast.sz22.day02;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
/**
* Author itcast
* Date 2021/5/5 17:23
* 需求:使用flink-connector-kafka_2.12中的FlinkKafkaConsumer消费Kafka中的数据做WordCount
* 需要设置如下参数:
* 1.订阅的主题
* 2.反序列化规则
* 3.消费者属性-集群地址
* 4.消费者属性-消费者组id(如果不设置,会有默认的,但是默认的不方便管理)
* 5.消费者属性-offset重置规则,如earliest/latest...
* 6.动态分区检测(当kafka的分区数变化/增加时,Flink能够检测到!)
* 7.如果没有设置Checkpoint,那么可以设置自动提交offset,后续学习了Checkpoint会把offset随着做Checkpoint的时候提交到Checkpoint和默认主题中
*/
public class FlinkKafkaConsumerDemo {
public static void main(String[] args) throws Exception {
//1.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//开启checkpoint
env.enableCheckpointing(5000);
//2.Source
Properties props = new Properties();
props.setProperty("bootstrap.servers", "node1:9092");
props.setProperty("group.id", "flink");
props.setProperty("auto.offset.reset","latest");
props.setProperty("flink.partition-discovery.interval-millis","5000");//会开启一个后台线程每隔5s检测一下Kafka的分区情况
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "2000");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("flink_kafka"
, new SimpleStringSchema(), props);
consumer.setStartFromEarliest();
DataStreamSource<String> source = env.addSource(consumer).setParallelism(1);
source.print();
env.execute();
}
}
~~~
+ kafka 消费数据
~~~java
package cn.itcast.sz22.day02;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
/**
* Author itcast
* Date 2021/5/5 17:23
* 需求:使用flink-connector-kafka_2.12中的FlinkKafkaConsumer消费Kafka中的数据做WordCount
* 需要设置如下参数:
* 1.订阅的主题
* 2.反序列化规则
* 3.消费者属性-集群地址
* 4.消费者属性-消费者组id(如果不设置,会有默认的,但是默认的不方便管理)
* 5.消费者属性-offset重置规则,如earliest/latest...
* 6.动态分区检测(当kafka的分区数变化/增加时,Flink能够检测到!)
* 7.如果没有设置Checkpoint,那么可以设置自动提交offset,后续学习了Checkpoint会把offset随着做Checkpoint的时候提交到Checkpoint和默认主题中
*/
public class FlinkKafkaConsumerDemo {
public static void main(String[] args) throws Exception {
//1.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//开启checkpoint
env.enableCheckpointing(5000);
//2.Source
Properties props = new Properties();
props.setProperty("bootstrap.servers", "node1:9092");
props.setProperty("group.id", "flink");
props.setProperty("auto.offset.reset","latest");
props.setProperty("flink.partition-discovery.interval-millis","5000");//会开启一个后台线程每隔5s检测一下Kafka的分区情况
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "2000");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("flink_kafka"
, new SimpleStringSchema(), props);
consumer.setStartFromEarliest();
DataStreamSource<String> source = env.addSource(consumer).setParallelism(1);
source.print();
env.execute();
}
}
~~~
### redis
+ Flink-Sink-Redis
+ 案例 - 统计保存到 redis
~~~java
package cn.itcast.sz22.day02;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.apache.flink.util.Collector;
import java.util.Arrays;
/**
* Author itcast
* Date 2021/5/5 18:03
* Desc TODO
*/
public class FlinkRedisSink {
public static void main(String[] args) throws Exception {
//1.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.Source 通过socket获取数据源
DataStreamSource<String> source = env.socketTextStream("node1", 9999);
//3.Transformation
//3.1切割并记为1
DataStream<String> faltMapDS = source.flatMap((String value, Collector<String> out) ->
Arrays.stream(value.split(" "))
.forEach(out::collect))
.returns(Types.STRING);
//O map(T value)
SingleOutputStreamOperator<Tuple2<String, Integer>> mapDS = faltMapDS
.map((word) -> Tuple2.of(word, 1))
.returns(Types.TUPLE(Types.STRING, Types.INT));
SingleOutputStreamOperator<Tuple2<String, Integer>> result = mapDS.keyBy(t -> t.f0).sum(1);
//4.Sink
FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder()
.setHost("node1").build();
result.addSink(new RedisSink<Tuple2<String, Integer>>(config, new RedisMapperEx()));
env.execute();
// * 最后将结果保存到Redis 实现 FlinkJedisPoolConfig
// * 注意:存储到Redis的数据结构:使用hash也就是map
// * key value
// * WordCount (单词,数量)
//-1.创建RedisSink之前需要创建RedisConfig
//连接单机版Redis
//5.execute
}
public static class RedisMapperEx implements RedisMapper<Tuple2<String, Integer>> {
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.HSET, "hashOpt");
}
@Override
public String getKeyFromData(Tuple2<String, Integer> data) {
return data.f0;
}
@Override
public String getValueFromData(Tuple2<String, Integer> data) {
return data.f1 + "";
}
}
}
oolConfig config = new FlinkJedisPoolConfig.Builder()
.setHost("node1").build();
result.addSink(new RedisSink<Tuple2<String, Integer>>(config, new RedisMapperEx()));
env.execute();
// * 最后将结果保存到Redis 实现 FlinkJedisPoolConfig
// * 注意:存储到Redis的数据结构:使用hash也就是map
// * key value
// * WordCount (单词,数量)
//-1.创建RedisSink之前需要创建RedisConfig
//连接单机版Redis
//5.execute
}
public static class RedisMapperEx implements RedisMapper<Tuple2<String, Integer>> {
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.HSET, "hashOpt");
}
@Override
public String getKeyFromData(Tuple2<String, Integer> data) {
return data.f0;
}
@Override
public String getValueFromData(Tuple2<String, Integer> data) {
return data.f1 + "";
}
}
}
~~~
【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
作者其他文章
评论(0)