【技术分享】FlinkSQL消费Kafka写Redis
- 前提条件
MRS集群中包含kafka\flink\redis且各个服务运行正常
- 数据流向&业务逻辑
本样例是将kafka读的数据通过Flink解析后,统计不同account账号总消费金额,然后将结果缓存进redis,通过Redis查询最终的消费统计结果。
- 样例数据
{"account": "A6","costs":"35"}
{"account": "A7","costs":"36"}
{"account": "A8","costs":"37"}
{"account": "A9","costs":"38"}
{"account": "A6","costs":"35"}
{"account": "A7","costs":"36"}
- 操作步骤
1、登录FIM,创建Redis集群
2、设置Redis参数REDIS_SECURITY_ENABLED值为false,保存并重启redis服务
3、设置kafka的参数allow.everyone.if.no.acl.found值为true,保存并重启kafka服务
4、给本次实验的集群用户,绑定具有FlinkServer管理操作权限
5、登录FlinkServer webui界面,创建FlinkSQL任务
6、编写FlinkSQL,校验语法格式
CREATE TABLE kafka_source (
account varchar(10),
costs int,
ts AS PROCTIME()
) WITH (
'connector' = 'kafka',
'topic' = 'topic_test',
'properties.bootstrap.servers' = 'x.x.x.x:21005',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
);CREATE SINK STREAM redis_sink(
account varchar,
costs int,
PRIMARY KEY(account)
) WITH (
'isSafeMode' = 'false',
'clusterAddress' = 'x:22400',
'redistype' = 'String',
'type' = 'Redis',
'isSSLMode' = 'false',
'ttlDurationSecond' = '86400',
'keyPrefix' = 'mykey'
);
INSERT INTO
redis_sink
select
account,
SUM(costs)
FROM
kafka_source
GROUP BY
TUMBLE(ts, INTERVAL '10' SECOND),
account;
7、启动任务
8、生成数据
kafka-console-producer.sh --broker-list x.x.x.x:21007 --topic topic_test --producer.config /opt/client/Kafka/kafka/config/producer.properties
- 结果验证
上述样例数据总共生成A7账户3次,每次消费金额36,经过Flink计算后总共108,查看Redis A7账户结果如下:
- 点赞
- 收藏
- 关注作者
评论(0)