实时大数据技术flink之Connector

举报
tea_year 发表于 2025/08/31 19:45:15 2025/08/31
【摘要】 ConnectorFlink DataStream 连接器核心框架​三种连接器均遵循 Flink DataStream“环境准备→数据接入→转换处理→结果输出→执行提交” 的标准化流程,核心共性如下:​环境初始化:统一通过StreamExecutionEnvironment.getExecutionEnvironment()获取执行环境,并支持setRuntimeMode(RuntimeEx...

Connector

Flink DataStream 连接器核心框架​
三种连接器均遵循 Flink DataStream“环境准备→数据接入→转换处理→结果输出→执行提交” 的标准化流程,核心共性如下:​
环境初始化:统一通过StreamExecutionEnvironment.getExecutionEnvironment()获取执行环境,并支持setRuntimeMode(RuntimeExecutionMode.AUTOMATIC)自动适配流 / 批处理模式,降低模式切换成本。​
Sink 核心逻辑:均通过addSink()方法对接外部系统,连接器底层封装了数据序列化、连接池管理、故障重试等细节,上层仅需关注业务参数配置。​
配置化接入:外部系统的连接信息(如地址、端口、账号密码)均通过结构化配置(如JdbcConnectionOptions、Properties、FlinkJedisPoolConfig)传入,便于维护和环境切换。

JDBC

https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/datastream/jdbc/

核心功能
  • 数据写入:通JdbcSink.sink()实现流数据批量写入关系型数据库,支持自定义 SQL 语句与参数映射,适用于结构化数据持久化场景(如业务数据落地、统计结果存储)。
关键代码解析
  • 连接配置:通JdbcConnectionOptionsBuilder指定数据库 URL(jdbc:mysql://host:port/db)、驱动类(com.mysql.jdbc.Driver)、用户名和密码,确保与数据库建立稳定连接。
  • 批量写入优化:通JdbcExecutionOptions.builder().withBatchSize(2)设置批量提交阈值,当分区数据量达到阈值时触发写入,减少数据库 IO 次数,提升性能。
  • 参数映射:通过匿名内部类实现(ps, t) -> {ps.setString(1,t.getName()); ps.setInt(2,t.getAge());},将流Student对象的字段与 SQL 占位符绑定,避免 SQL 注入风险。
适用场景
  • 流数据实时写入 MySQL、PostgreSQL 等关系型数据库,如用户行为日志落地、实时报表数据存储。

Flink官方提供了JdbcSink

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class JDBCDemo {
   public static void main(String[] args) throws Exception {
       //TODO 1.env-准备环境
       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
       //TODO 2.source-加载数据
       DataStream<Student> studentDS = env.fromElements(new Student(null, "lucy", 18));
       //TODO 3.transformation-数据转换处理
       //TODO 4.sink-数据输出
       studentDS.addSink(JdbcSink.sink(
               "INSERT INTO `t_student` (`id`, `name`, `age`) VALUES (null, ?, ?)",
              (ps, t) -> {
                   ps.setString(1,t.getName());
                   ps.setInt(2,t.getAge());
                   //分区数据量到达2条时,触发写入操作
              },JdbcExecutionOptions.builder().withBatchSize(2).build()    
              ,new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                      .withUrl("jdbc:mysql://hadoop10:3306/test1")
                      .withDriverName("com.mysql.jdbc.Driver")
                      .withUsername("root")
                      .withPassword("123456")
                      .build()));

       //TODO 5.execute-执行
       env.execute();
  }
}


Kafka

https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/datastream/kafka/

核心功能
  • 双向数据交互:支持通FlinkKafkaConsumer从 Kafka Topic 消费数据,通FlinkKafkaProducer将处理后的数据写入 Kafka Topic,实现流数据的 “接入 - 处理 - 分发” 闭环,是实时 ETL 的核心组件。
关键代码解析
  • 消费端配置
  • 基础连接:bootstrap.servers指定 Kafka 集群地址,group.id指定消费者组,确保消费位置的协同管理。
  • 消费策略:auto.offset.resetlatest从最新位置消费)、enable.auto.commit(自动提交 offset)控制消费行为,flink.partition-discovery.interval-millis支持动态发现 Kafka 新增分区。
  • 序列化:SimpleStringSchema将 Kafka 消息体反序列化为 String,适用于 JSON、日志等文本类数据。
  • 生产端配置:复用消费Properties,指定目标 Topic(topic2),通SimpleStringSchema将处理后的 String 数据序列化后写入 Kafka。
  • ETL 逻辑:通filter()函数筛选包含 “success” 的日志数据,实现实时数据清洗,符合 “消费 Topic1→过滤→写入 Topic2” 的业务需求。
适用场景
  • 实时日志采集与分发(如用户操作日志从 Topic1 过滤后分发到 Topic2 供下游分析)、流式数据解耦(Flink 作为中间处理层连接上游生产与下游消费)。
  • 需求

从Kafka的topic1中消费日志数据,并做实时ETL,将状态为success的数据写入到Kafka的topic2中

  • 代码实现

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;

import java.util.Properties;

/**
* Desc 演示Flink-DataStream-Connectors-Kafka
* 从Kafka的主题1中消费日志数据,并做实时ETL,将状态为success的数据写入到Kafka的主题2中
*/
public class KafkaDemo {
   public static void main(String[] args) throws Exception {
       //TODO 1.env-准备环境
       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
       //TODO 2.source-加载数据
       //从kafka的topic1消费数据
       Properties properties = new Properties();
       properties.setProperty("bootstrap.servers", "hadoop10:9092");
       properties.setProperty("group.id", "g1");
       //下面可选
       properties.setProperty("auto.offset.reset","latest");
       properties.setProperty("enable.auto.commit", "true");
       properties.setProperty("auto.commit.interval.ms", "2000");
       properties.setProperty("flink.partition-discovery.interval-millis","5000");
       
       //上面可选
       FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>("topic1", new SimpleStringSchema(), properties);
       DataStreamSource<String> kafkaDS = env.addSource(kafkaSource);

       //TODO 3.transformation-数据转换处理
       SingleOutputStreamOperator<String> result = kafkaDS.filter(new FilterFunction<String>() {
           @Override
           public boolean filter(String value) throws Exception {
               return value.contains("success");
          }
      });

       //TODO 4.sink-数据输出
       FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>("topic2", new SimpleStringSchema(), properties);
       result.addSink(kafkaSink);

       //TODO 5.execute-执行
       env.execute();
  }
}
//1.准备topic1和topic2
//2.启动kafka
//3.往topic1发送如下数据
//kafka-console-producer.sh --broker-list hadoop:9092 --topic topic1
//log:2020-10-10 success xxx
//log:2020-10-10 success xxx
//log:2020-10-10 success xxx
//log:2020-10-10 fail xxx
//4.观察topic2的数据



Redis

https://bahir.apache.org/docs/flink/current/flink-streaming-redis/

核心功能
  • 数据结构化存储:通RedisSink将 Flink 计算结果(如 WordCount 统计值)写入 Redis,支持 Redis 多种数据结构(如 Hash、String、List),适用于实时计数、缓存更新等场景。
关键代码解析
  • 连接配置:通FlinkJedisPoolConfig.Builder().setHost("hadoop10")指定 Redis 服务地址,底层自动维护 Jedis 连接池,避免频繁创建连接的开销。
  • 数据结构映射
  • 自定RedisMapper接口实现,通getCommandDescription()指定 Redis 命令(RedisCommand.HSET)与顶层 Key(wc_result),此处使用 Hash 结构存储 WordCount 结果,Hash 的 Key 为单词,Value 为计数。
  • getKeyFromData()getValueFromData()分别Tuple2<String, Integer>(单词 - 计数对)中提取 Hash 的 Key 和 Value,实现数据与 Redis 结构的精准映射。
  • 计算逻辑:通flatMap()(分词)→map()(转 <单词,1>)→keyBy()(按单词分组)→sum(1)(聚合计数)完成 WordCount 计算,结果同时打印到控制台并写入 Redis。
适用场景
  • 实时指标统计(如直播间在线人数、商品实时销量存储到 Redis)、临时结果缓存(避免重复计算,提升下游查询效率)。

需求:将实时统计的WordCount结果写入到Redis

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.apache.flink.util.Collector;


public class RedisDemo {
   public static void main(String[] args) throws Exception {
       //TODO 1.env-准备环境
       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
       //TODO 2.source-加载数据
       DataStream<String> socketDS = env.socketTextStream("hadoop10", 8889);

       //TODO 3.transformation-数据转换处理
       //3.1对每一行数据进行分割并压扁
       DataStream<String> wordsDS = socketDS.flatMap(new FlatMapFunction<String, String>() {
           @Override
           public void flatMap(String value, Collector<String> out) throws Exception {
               String[] words = value.split(" ");
               for (String word : words) {
                   out.collect(word);
              }
          }
      });
       //3.2每个单词记为<单词,1>
       DataStream<Tuple2<String, Integer>> wordAndOneDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
           @Override
           public Tuple2<String, Integer> map(String value) throws Exception {
               return Tuple2.of(value, 1);
          }
      });
       //3.3分组
       KeyedStream<Tuple2<String, Integer>, String> keyedDS = wordAndOneDS.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
           @Override
           public String getKey(Tuple2<String, Integer> value) throws Exception {
               return value.f0;
          }
      });

       //3.4聚合
       DataStream<Tuple2<String, Integer>> result = keyedDS.sum(1);

       //TODO 4.sink-数据输出
       result.print();
       //将结果输出到Redis
       FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("hadoop10").build();//指定Redis的地址
       RedisSink<Tuple2<String, Integer>> redisSink = new RedisSink<>(conf, new MyRedisMapper());
       result.addSink(redisSink);

       //TODO 5.execute-执行
       env.execute();
  }

   //自定义Mapper,指定要使用的Redis的命令和key:value
   public static class MyRedisMapper implements RedisMapper<Tuple2<String, Integer>>{
       //获取命令描述,也就是要使用什么命令将结果写入到Redis
       //可以使用下面的格式
       //key:单词,value:数量
       //key:wc_result,hash:k:单词,v:数量
       //复习:Redis的数据结构:
       //key:value
       //key是固定的String类型, value可以是String/Hash/List/Set/SortSet
       /**
                      Hash      
           key       value
           wc_result 单词   出现的次数
                      单词   出现的次数      
                     
           命令:hset wc_result 单词 出现的次数
       **/
       @Override
       public RedisCommandDescription getCommandDescription() {
           return new RedisCommandDescription(RedisCommand.HSET, "wc_result");
      }

       @Override
       public String getKeyFromData(Tuple2<String, Integer> t) {
           return t.f0;
      }

       @Override
       public String getValueFromData(Tuple2<String, Integer> t) {
           return t.f1.toString();
      }
  }
   
}


关键差异与选型建议

连接器




核心优势




注意事项




选型场景




JDBC




结构化数据持久化,支持事务(部分数据库)




批量写入阈值需根据数据库性能调整,避免单次写入过大导致超时




需长期存储、需事务保证的结构化数据(如业务订单)




Kafka




高吞吐、低延迟,支持动态分区发现




需合理配置消费者组与 offset 提交策略,避免数据重复消费




实时数据分发、ETL 中间层、上下游解耦




Redis




高性能读写,支持多种数据结构




需注意 Redis 内存限制,避免计数结果过大导致内存溢出




实时计数、临时缓存、高频查询指标(如 WordCount、实时排名)









通用注意事项

  1. 连接稳定性:三种连接器均需确保外部系统(数据库、Kafka、Redis)服务可用,生产环境建议配置连接超时重试、故障转移(如 Kafka 集群、Redis 主从)。
  1. 序列化选择:根据数据类型选择合适的序列化器(如 Kafka SimpleStringSchema适用于文本,AvroSchema适用于二进制结构化数据),避免序列化异常。
  1. 性能优化:JDBC 的批量大小、Redis 的连接池配置、Kafka 的分区数与消费者并行度需匹配,确保 Flink 算子并行度与外部系统处理能力对齐,避免瓶颈。

【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。