2021年大数据Flink(十六):流批一体API Connectors ​​​​​​​​​​​​​​Redis

举报
Lansonli 发表于 2021/09/28 22:41:01 2021/09/28
【摘要】 目录 Redis API 使用RedisCommand设置数据结构类型时和redis结构对应关系 需求 代码实现 Redis API 通过flink 操作redis 其实我们可以通过传统的redis 连接池Jpoools 进行redis 的相关操作,但是flink 提供了专门操作redis 的RedisSink,使用起...

目录

Redis

API

使用RedisCommand设置数据结构类型时和redis结构对应关系

需求

代码实现


Redis

API

通过flink 操作redis 其实我们可以通过传统的redis 连接池Jpoools 进行redis 的相关操作,但是flink 提供了专门操作redis 的RedisSink,使用起来更方便,而且不用我们考虑性能的问题,接下来将主要介绍RedisSink 如何使用。

https://bahir.apache.org/docs/flink/current/flink-streaming-redis/

 

RedisSink 核心类是RedisMapper 是一个接口,使用时我们要编写自己的redis 操作类实现这个接口中的三个方法,如下所示

1.getCommandDescription() :

设置使用的redis 数据结构类型,和key 的名称,通过RedisCommand 设置数据结构类型

2.String getKeyFromData(T data):

设置value 中的键值对key的值

3.String getValueFromData(T data);

设置value 中的键值对value的值

 

使用RedisCommand设置数据结构类型时和redis结构对应关系

Data Type

Redis Command [Sink]

HASH

HSET

LIST

RPUSH, LPUSH

SET

SADD

PUBSUB

PUBLISH

STRING

SET

HYPER_LOG_LOG

PFADD

SORTED_SET

ZADD

SORTED_SET

ZREM

 

需求

将Flink集合中的数据通过自定义Sink保存到Redis

 

代码实现


  
  1. package cn.itcast.connectors;
  2. import org.apache.flink.api.common.functions.FlatMapFunction;
  3. import org.apache.flink.api.java.tuple.Tuple;
  4. import org.apache.flink.api.java.tuple.Tuple2;
  5. import org.apache.flink.streaming.api.datastream.DataStream;
  6. import org.apache.flink.streaming.api.datastream.KeyedStream;
  7. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  8. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  9. import org.apache.flink.streaming.connectors.redis.RedisSink;
  10. import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
  11. import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
  12. import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
  13. import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
  14. import org.apache.flink.util.Collector;
  15. /**
  16.  * Author itcast
  17.  * Desc
  18.  * 需求:
  19.  * 接收消息并做WordCount,
  20.  * 最后将结果保存到Redis
  21.  * 注意:存储到Redis的数据结构:使用hash也就是map
  22.  * key            value
  23.  * WordCount    (单词,数量)
  24.  */
  25. public class ConnectorsDemo_Redis {
  26.     public static void main(String[] args) throws Exception {
  27.         //1.env
  28.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  29.         //2.Source
  30.         DataStream<String> linesDS = env.socketTextStream("node1", 9999);
  31.         //3.Transformation
  32.         //3.1切割并记为1
  33.         SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOneDS = linesDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
  34.             @Override
  35.             public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
  36.                 String[] words = value.split(" ");
  37.                 for (String word : words) {
  38.                     out.collect(Tuple2.of(word, 1));
  39.                 }
  40.             }
  41.         });
  42.         //3.2分组
  43.         KeyedStream<Tuple2<String, Integer>, Tuple> groupedDS = wordAndOneDS.keyBy(0);
  44.         //3.3聚合
  45.         SingleOutputStreamOperator<Tuple2<String, Integer>> result = groupedDS.sum(1);
  46.         //4.Sink
  47.         result.print();
  48.         // * 最后将结果保存到Redis
  49.         // * 注意:存储到Redis的数据结构:使用hash也就是map
  50.         // * key            value
  51.         // * WordCount      (单词,数量)
  52.         //-1.创建RedisSink之前需要创建RedisConfig
  53.         //连接单机版Redis
  54.         FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build();
  55.         //连接集群版Redis
  56.         //HashSet<InetSocketAddress> nodes = new HashSet<>(Arrays.asList(new InetSocketAddress(InetAddress.getByName("node1"), 6379),new InetSocketAddress(InetAddress.getByName("node2"), 6379),new InetSocketAddress(InetAddress.getByName("node3"), 6379)));
  57.         //FlinkJedisClusterConfig conf2 = new FlinkJedisClusterConfig.Builder().setNodes(nodes).build();
  58.         //连接哨兵版Redis
  59.         //Set<String> sentinels = new HashSet<>(Arrays.asList("node1:26379", "node2:26379", "node3:26379"));
  60.         //FlinkJedisSentinelConfig conf3 = new FlinkJedisSentinelConfig.Builder().setMasterName("mymaster").setSentinels(sentinels).build();
  61.         //-3.创建并使用RedisSink
  62.         result.addSink(new RedisSink<Tuple2<String, Integer>>(conf, new RedisWordCountMapper()));
  63.         //5.execute
  64.         env.execute();
  65.     }
  66.     /**
  67.      * -2.定义一个Mapper用来指定存储到Redis中的数据结构
  68.      */
  69.     public static class RedisWordCountMapper implements RedisMapper<Tuple2<String, Integer>> {
  70.         @Override
  71.         public RedisCommandDescription getCommandDescription() {
  72.             return new RedisCommandDescription(RedisCommand.HSET, "WordCount");
  73.         }
  74.         @Override
  75.         public String getKeyFromData(Tuple2<String, Integer> data) {
  76.             return data.f0;
  77.         }
  78.         @Override
  79.         public String getValueFromData(Tuple2<String, Integer> data) {
  80.             return data.f1.toString();
  81.         }
  82.     }
  83. }

 

文章来源: lansonli.blog.csdn.net,作者:Lansonli,版权归原作者所有,如需转载,请联系作者。

原文链接:lansonli.blog.csdn.net/article/details/116245738

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。