实时大数据技术flink之Connector
Connector
Flink DataStream 连接器核心框架
三种连接器均遵循 Flink DataStream“环境准备→数据接入→转换处理→结果输出→执行提交” 的标准化流程,核心共性如下:
环境初始化:统一通过StreamExecutionEnvironment.getExecutionEnvironment()获取执行环境,并支持setRuntimeMode(RuntimeExecutionMode.AUTOMATIC)自动适配流 / 批处理模式,降低模式切换成本。
Sink 核心逻辑:均通过addSink()方法对接外部系统,连接器底层封装了数据序列化、连接池管理、故障重试等细节,上层仅需关注业务参数配置。
配置化接入:外部系统的连接信息(如地址、端口、账号密码)均通过结构化配置(如JdbcConnectionOptions、Properties、FlinkJedisPoolConfig)传入,便于维护和环境切换。
JDBC
https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/datastream/jdbc/
- 数据写入:通过JdbcSink.sink()实现流数据批量写入关系型数据库,支持自定义 SQL 语句与参数映射,适用于结构化数据持久化场景(如业务数据落地、统计结果存储)。
- 连接配置:通过JdbcConnectionOptionsBuilder指定数据库 URL(jdbc:mysql://host:port/db)、驱动类(com.mysql.jdbc.Driver)、用户名和密码,确保与数据库建立稳定连接。
- 批量写入优化:通过JdbcExecutionOptions.builder().withBatchSize(2)设置批量提交阈值,当分区数据量达到阈值时触发写入,减少数据库 IO 次数,提升性能。
- 参数映射:通过匿名内部类实现(ps, t) -> {ps.setString(1,t.getName()); ps.setInt(2,t.getAge());},将流中Student对象的字段与 SQL 占位符绑定,避免 SQL 注入风险。
- 流数据实时写入 MySQL、PostgreSQL 等关系型数据库,如用户行为日志落地、实时报表数据存储。
Flink官方提供了JdbcSink
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class JDBCDemo {
public static void main(String[] args) throws Exception {
//TODO 1.env-准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//TODO 2.source-加载数据
DataStream<Student> studentDS = env.fromElements(new Student(null, "lucy", 18));
//TODO 3.transformation-数据转换处理
//TODO 4.sink-数据输出
studentDS.addSink(JdbcSink.sink(
"INSERT INTO `t_student` (`id`, `name`, `age`) VALUES (null, ?, ?)",
(ps, t) -> {
ps.setString(1,t.getName());
ps.setInt(2,t.getAge());
//分区数据量到达2条时,触发写入操作
},JdbcExecutionOptions.builder().withBatchSize(2).build()
,new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://hadoop10:3306/test1")
.withDriverName("com.mysql.jdbc.Driver")
.withUsername("root")
.withPassword("123456")
.build()));
//TODO 5.execute-执行
env.execute();
}
}
Kafka
https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/datastream/kafka/
- 双向数据交互:支持通过FlinkKafkaConsumer从 Kafka Topic 消费数据,通过FlinkKafkaProducer将处理后的数据写入 Kafka Topic,实现流数据的 “接入 - 处理 - 分发” 闭环,是实时 ETL 的核心组件。
- 消费端配置:
- 基础连接:bootstrap.servers指定 Kafka 集群地址,group.id指定消费者组,确保消费位置的协同管理。
- 消费策略:auto.offset.reset(如latest从最新位置消费)、enable.auto.commit(自动提交 offset)控制消费行为,flink.partition-discovery.interval-millis支持动态发现 Kafka 新增分区。
- 序列化:SimpleStringSchema将 Kafka 消息体反序列化为 String,适用于 JSON、日志等文本类数据。
- 生产端配置:复用消费端Properties,指定目标 Topic(如topic2),通过SimpleStringSchema将处理后的 String 数据序列化后写入 Kafka。
- ETL 逻辑:通过filter()函数筛选包含 “success” 的日志数据,实现实时数据清洗,符合 “消费 Topic1→过滤→写入 Topic2” 的业务需求。
- 实时日志采集与分发(如用户操作日志从 Topic1 过滤后分发到 Topic2 供下游分析)、流式数据解耦(Flink 作为中间处理层连接上游生产与下游消费)。
-
需求
从Kafka的topic1中消费日志数据,并做实时ETL,将状态为success的数据写入到Kafka的topic2中
-
代码实现
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import java.util.Properties;
/**
* Desc 演示Flink-DataStream-Connectors-Kafka
* 从Kafka的主题1中消费日志数据,并做实时ETL,将状态为success的数据写入到Kafka的主题2中
*/
public class KafkaDemo {
public static void main(String[] args) throws Exception {
//TODO 1.env-准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//TODO 2.source-加载数据
//从kafka的topic1消费数据
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "hadoop10:9092");
properties.setProperty("group.id", "g1");
//下面可选
properties.setProperty("auto.offset.reset","latest");
properties.setProperty("enable.auto.commit", "true");
properties.setProperty("auto.commit.interval.ms", "2000");
properties.setProperty("flink.partition-discovery.interval-millis","5000");
//上面可选
FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>("topic1", new SimpleStringSchema(), properties);
DataStreamSource<String> kafkaDS = env.addSource(kafkaSource);
//TODO 3.transformation-数据转换处理
SingleOutputStreamOperator<String> result = kafkaDS.filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
return value.contains("success");
}
});
//TODO 4.sink-数据输出
FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>("topic2", new SimpleStringSchema(), properties);
result.addSink(kafkaSink);
//TODO 5.execute-执行
env.execute();
}
}
//1.准备topic1和topic2
//2.启动kafka
//3.往topic1发送如下数据
//kafka-console-producer.sh --broker-list hadoop:9092 --topic topic1
//log:2020-10-10 success xxx
//log:2020-10-10 success xxx
//log:2020-10-10 success xxx
//log:2020-10-10 fail xxx
//4.观察topic2的数据
Redis
https://bahir.apache.org/docs/flink/current/flink-streaming-redis/
- 数据结构化存储:通过RedisSink将 Flink 计算结果(如 WordCount 统计值)写入 Redis,支持 Redis 多种数据结构(如 Hash、String、List),适用于实时计数、缓存更新等场景。
- 连接配置:通过FlinkJedisPoolConfig.Builder().setHost("hadoop10")指定 Redis 服务地址,底层自动维护 Jedis 连接池,避免频繁创建连接的开销。
- 数据结构映射:
- 自定义RedisMapper接口实现,通过getCommandDescription()指定 Redis 命令(如RedisCommand.HSET)与顶层 Key(如wc_result),此处使用 Hash 结构存储 WordCount 结果,Hash 的 Key 为单词,Value 为计数。
- getKeyFromData()和getValueFromData()分别从Tuple2<String, Integer>(单词 - 计数对)中提取 Hash 的 Key 和 Value,实现数据与 Redis 结构的精准映射。
- 计算逻辑:通过flatMap()(分词)→map()(转 <单词,1>)→keyBy()(按单词分组)→sum(1)(聚合计数)完成 WordCount 计算,结果同时打印到控制台并写入 Redis。
- 实时指标统计(如直播间在线人数、商品实时销量存储到 Redis)、临时结果缓存(避免重复计算,提升下游查询效率)。
需求:将实时统计的WordCount结果写入到Redis
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.apache.flink.util.Collector;
public class RedisDemo {
public static void main(String[] args) throws Exception {
//TODO 1.env-准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//TODO 2.source-加载数据
DataStream<String> socketDS = env.socketTextStream("hadoop10", 8889);
//TODO 3.transformation-数据转换处理
//3.1对每一行数据进行分割并压扁
DataStream<String> wordsDS = socketDS.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
String[] words = value.split(" ");
for (String word : words) {
out.collect(word);
}
}
});
//3.2每个单词记为<单词,1>
DataStream<Tuple2<String, Integer>> wordAndOneDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
return Tuple2.of(value, 1);
}
});
//3.3分组
KeyedStream<Tuple2<String, Integer>, String> keyedDS = wordAndOneDS.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
});
//3.4聚合
DataStream<Tuple2<String, Integer>> result = keyedDS.sum(1);
//TODO 4.sink-数据输出
result.print();
//将结果输出到Redis
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("hadoop10").build();//指定Redis的地址
RedisSink<Tuple2<String, Integer>> redisSink = new RedisSink<>(conf, new MyRedisMapper());
result.addSink(redisSink);
//TODO 5.execute-执行
env.execute();
}
//自定义Mapper,指定要使用的Redis的命令和key:value
public static class MyRedisMapper implements RedisMapper<Tuple2<String, Integer>>{
//获取命令描述,也就是要使用什么命令将结果写入到Redis
//可以使用下面的格式
//key:单词,value:数量
//key:wc_result,hash:k:单词,v:数量
//复习:Redis的数据结构:
//key:value
//key是固定的String类型, value可以是String/Hash/List/Set/SortSet
/**
Hash
key value
wc_result 单词 出现的次数
单词 出现的次数
命令:hset wc_result 单词 出现的次数
**/
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.HSET, "wc_result");
}
@Override
public String getKeyFromData(Tuple2<String, Integer> t) {
return t.f0;
}
@Override
public String getValueFromData(Tuple2<String, Integer> t) {
return t.f1.toString();
}
}
}
关键差异与选型建议
连接器
|
核心优势
|
注意事项
|
选型场景
|
JDBC
|
结构化数据持久化,支持事务(部分数据库)
|
批量写入阈值需根据数据库性能调整,避免单次写入过大导致超时
|
需长期存储、需事务保证的结构化数据(如业务订单)
|
Kafka
|
高吞吐、低延迟,支持动态分区发现
|
需合理配置消费者组与 offset 提交策略,避免数据重复消费
|
实时数据分发、ETL 中间层、上下游解耦
|
Redis
|
高性能读写,支持多种数据结构
|
需注意 Redis 内存限制,避免计数结果过大导致内存溢出
|
实时计数、临时缓存、高频查询指标(如 WordCount、实时排名)
|
通用注意事项
- 连接稳定性:三种连接器均需确保外部系统(数据库、Kafka、Redis)服务可用,生产环境建议配置连接超时重试、故障转移(如 Kafka 集群、Redis 主从)。
- 序列化选择:根据数据类型选择合适的序列化器(如 Kafka 的SimpleStringSchema适用于文本,AvroSchema适用于二进制结构化数据),避免序列化异常。
- 性能优化:JDBC 的批量大小、Redis 的连接池配置、Kafka 的分区数与消费者并行度需匹配,确保 Flink 算子并行度与外部系统处理能力对齐,避免瓶颈。
- 点赞
- 收藏
- 关注作者
评论(0)