2021年大数据Flink(十六):流批一体API Connectors Redis
目录
使用RedisCommand设置数据结构类型时和redis结构对应关系
Redis
API
通过flink 操作redis 其实我们可以通过传统的redis 连接池Jpoools 进行redis 的相关操作,但是flink 提供了专门操作redis 的RedisSink,使用起来更方便,而且不用我们考虑性能的问题,接下来将主要介绍RedisSink 如何使用。
https://bahir.apache.org/docs/flink/current/flink-streaming-redis/
RedisSink 核心类是RedisMapper 是一个接口,使用时我们要编写自己的redis 操作类实现这个接口中的三个方法,如下所示
1.getCommandDescription() :
设置使用的redis 数据结构类型,和key 的名称,通过RedisCommand 设置数据结构类型
2.String getKeyFromData(T data):
设置value 中的键值对key的值
3.String getValueFromData(T data);
设置value 中的键值对value的值
使用RedisCommand设置数据结构类型时和redis结构对应关系
Data Type |
Redis Command [Sink] |
HASH |
HSET |
LIST |
RPUSH, LPUSH |
SET |
SADD |
PUBSUB |
PUBLISH |
STRING |
SET |
HYPER_LOG_LOG |
PFADD |
SORTED_SET |
ZADD |
SORTED_SET |
ZREM |
需求
将Flink集合中的数据通过自定义Sink保存到Redis
代码实现
-
package cn.itcast.connectors;
-
-
import org.apache.flink.api.common.functions.FlatMapFunction;
-
import org.apache.flink.api.java.tuple.Tuple;
-
import org.apache.flink.api.java.tuple.Tuple2;
-
import org.apache.flink.streaming.api.datastream.DataStream;
-
import org.apache.flink.streaming.api.datastream.KeyedStream;
-
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
import org.apache.flink.streaming.connectors.redis.RedisSink;
-
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
-
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
-
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
-
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
-
import org.apache.flink.util.Collector;
-
-
/**
-
* Author itcast
-
* Desc
-
* 需求:
-
* 接收消息并做WordCount,
-
* 最后将结果保存到Redis
-
* 注意:存储到Redis的数据结构:使用hash也就是map
-
* key value
-
* WordCount (单词,数量)
-
*/
-
public class ConnectorsDemo_Redis {
-
public static void main(String[] args) throws Exception {
-
//1.env
-
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
//2.Source
-
DataStream<String> linesDS = env.socketTextStream("node1", 9999);
-
-
//3.Transformation
-
//3.1切割并记为1
-
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOneDS = linesDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
-
@Override
-
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
-
String[] words = value.split(" ");
-
for (String word : words) {
-
out.collect(Tuple2.of(word, 1));
-
}
-
}
-
});
-
//3.2分组
-
KeyedStream<Tuple2<String, Integer>, Tuple> groupedDS = wordAndOneDS.keyBy(0);
-
//3.3聚合
-
SingleOutputStreamOperator<Tuple2<String, Integer>> result = groupedDS.sum(1);
-
-
//4.Sink
-
result.print();
-
// * 最后将结果保存到Redis
-
// * 注意:存储到Redis的数据结构:使用hash也就是map
-
// * key value
-
// * WordCount (单词,数量)
-
-
//-1.创建RedisSink之前需要创建RedisConfig
-
//连接单机版Redis
-
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build();
-
//连接集群版Redis
-
//HashSet<InetSocketAddress> nodes = new HashSet<>(Arrays.asList(new InetSocketAddress(InetAddress.getByName("node1"), 6379),new InetSocketAddress(InetAddress.getByName("node2"), 6379),new InetSocketAddress(InetAddress.getByName("node3"), 6379)));
-
//FlinkJedisClusterConfig conf2 = new FlinkJedisClusterConfig.Builder().setNodes(nodes).build();
-
//连接哨兵版Redis
-
//Set<String> sentinels = new HashSet<>(Arrays.asList("node1:26379", "node2:26379", "node3:26379"));
-
//FlinkJedisSentinelConfig conf3 = new FlinkJedisSentinelConfig.Builder().setMasterName("mymaster").setSentinels(sentinels).build();
-
-
//-3.创建并使用RedisSink
-
result.addSink(new RedisSink<Tuple2<String, Integer>>(conf, new RedisWordCountMapper()));
-
-
//5.execute
-
env.execute();
-
}
-
-
/**
-
* -2.定义一个Mapper用来指定存储到Redis中的数据结构
-
*/
-
public static class RedisWordCountMapper implements RedisMapper<Tuple2<String, Integer>> {
-
@Override
-
public RedisCommandDescription getCommandDescription() {
-
return new RedisCommandDescription(RedisCommand.HSET, "WordCount");
-
}
-
@Override
-
public String getKeyFromData(Tuple2<String, Integer> data) {
-
return data.f0;
-
}
-
@Override
-
public String getValueFromData(Tuple2<String, Integer> data) {
-
return data.f1.toString();
-
}
-
}
-
}
文章来源: lansonli.blog.csdn.net,作者:Lansonli,版权归原作者所有,如需转载,请联系作者。
原文链接:lansonli.blog.csdn.net/article/details/116245738
- 点赞
- 收藏
- 关注作者
评论(0)